Adding routed RPC support in Remote RPC Router
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / sal / dom / broker / impl / SchemaAwareRpcBroker.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.dom.broker.impl;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12
13 import java.util.Set;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.ConcurrentMap;
16
17 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
18 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
19 import org.opendaylight.controller.md.sal.common.impl.routing.RoutingUtils;
20 import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
21 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
22 import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation;
23 import org.opendaylight.controller.sal.core.api.RpcImplementation;
24 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
25 import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
26 import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter;
27 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
28 import org.opendaylight.yangtools.concepts.Identifiable;
29 import org.opendaylight.yangtools.concepts.ListenerRegistration;
30 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
31 import org.opendaylight.yangtools.yang.common.QName;
32 import org.opendaylight.yangtools.yang.common.RpcResult;
33 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
34 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
35 import org.opendaylight.yangtools.yang.data.api.SimpleNode;
36 import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
37 import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
38 import org.opendaylight.yangtools.yang.model.api.Module;
39 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
40 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
41 import org.opendaylight.yangtools.yang.model.api.UnknownSchemaNode;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 import com.google.common.base.Optional;
46 import com.google.common.collect.ImmutableSet;
47
48 public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, RoutedRpcDefaultImplementation {
49
50     private static final Logger LOG = LoggerFactory.getLogger(SchemaAwareRpcBroker.class);
51
52     private static final QName CONTEXT_REFERENCE = QName.create("urn:opendaylight:yang:extension:yang-ext",
53             "2013-07-09", "context-reference");
54     private final ListenerRegistry<RpcRegistrationListener> rpcRegistrationListeners = new ListenerRegistry<>();
55     private final ListenerRegistry<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeChangeListeners = new ListenerRegistry<>();
56     
57
58     private final String identifier;
59     private final ConcurrentMap<QName, RpcImplementation> implementations = new ConcurrentHashMap<>();
60     private RpcImplementation defaultImplementation;
61     private SchemaContextProvider schemaProvider;
62     private RoutedRpcDefaultImplementation defaultDelegate;
63
64     public SchemaAwareRpcBroker(String identifier, SchemaContextProvider schemaProvider) {
65         super();
66         this.identifier = identifier;
67         this.schemaProvider = schemaProvider;
68     }
69
70     public RpcImplementation getDefaultImplementation() {
71         return defaultImplementation;
72     }
73
74     public void setDefaultImplementation(RpcImplementation defaultImplementation) {
75         this.defaultImplementation = defaultImplementation;
76     }
77
78     public SchemaContextProvider getSchemaProvider() {
79         return schemaProvider;
80     }
81
82     public void setSchemaProvider(SchemaContextProvider schemaProvider) {
83         this.schemaProvider = schemaProvider;
84     }
85
86   public RoutedRpcDefaultImplementation getRoutedRpcDefaultDelegate() {
87     return defaultDelegate;
88   }
89
90     @Override
91   public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultDelegate) {
92     this.defaultDelegate = defaultDelegate;
93   }
94
95   @Override
96     public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
97         checkArgument(rpcType != null, "RPC Type should not be null");
98         checkArgument(implementation != null, "RPC Implementatoin should not be null");
99         return getOrCreateRoutedRpcRouter(rpcType).addRoutedRpcImplementation(rpcType, implementation);
100     }
101
102     private RoutedRpcSelector getOrCreateRoutedRpcRouter(QName rpcType) {
103         RoutedRpcSelector potential = getRoutedRpcRouter(rpcType);
104         if (potential != null) {
105             return potential;
106         }
107         synchronized (implementations) {
108             potential = getRoutedRpcRouter(rpcType);
109             if (potential != null) {
110                 return potential;
111             }
112             RpcDefinition definition = findRpcDefinition(rpcType);
113             RoutingStrategy strategy = getRoutingStrategy(definition);
114             checkState(strategy instanceof RoutedRpcStrategy, "Rpc %s is not routed.", rpcType);
115             potential = new RoutedRpcSelector((RoutedRpcStrategy) strategy, this);
116             implementations.put(rpcType, potential);
117             return potential;
118         }
119     }
120
121     private RoutedRpcSelector getRoutedRpcRouter(QName rpcType) {
122         RpcImplementation potential = implementations.get(rpcType);
123         if (potential != null) {
124             checkState(potential instanceof RoutedRpcSelector, "Rpc %s is not routed.", rpcType);
125             return (RoutedRpcSelector) potential;
126         }
127         return null;
128
129     }
130
131     @Override
132     public RpcRegistration addRpcImplementation(QName rpcType, RpcImplementation implementation)
133             throws IllegalArgumentException {
134         checkArgument(rpcType != null, "RPC Type should not be null");
135         checkArgument(implementation != null, "RPC Implementatoin should not be null");
136         checkState(!hasRpcImplementation(rpcType), "Implementation already registered");
137         RpcDefinition definition = findRpcDefinition(rpcType);
138         checkArgument(!isRoutedRpc(definition), "RPC Type must not be routed.");
139         GlobalRpcRegistration reg = new GlobalRpcRegistration(rpcType, implementation, this);
140         implementations.putIfAbsent(rpcType, implementation);
141         return reg;
142     }
143
144     private boolean isRoutedRpc(RpcDefinition definition) {
145         return getRoutingStrategy(definition) instanceof RoutedRpcStrategy;
146     }
147
148     @Override
149     public ListenerRegistration<RpcRegistrationListener> addRpcRegistrationListener(RpcRegistrationListener listener) {
150         return rpcRegistrationListeners.register(listener);
151     }
152
153     @Override
154     public String getIdentifier() {
155         return identifier;
156     }
157
158     @Override
159     public Set<QName> getSupportedRpcs() {
160         return ImmutableSet.copyOf(implementations.keySet());
161     }
162
163     @Override
164     public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
165         return findRpcImplemention(rpc).invokeRpc(rpc, input);
166     }
167
168     private RpcImplementation findRpcImplemention(QName rpc) {
169         checkArgument(rpc != null, "Rpc name should not be null");
170         RpcImplementation potentialImpl = implementations.get(rpc);
171         if (potentialImpl != null) {
172             return potentialImpl;
173         }
174         potentialImpl = defaultImplementation;
175         checkState(potentialImpl != null, "Implementation is not available.");
176         return potentialImpl;
177     }
178
179     private boolean hasRpcImplementation(QName rpc) {
180         return implementations.containsKey(rpc);
181     }
182
183     private RpcDefinition findRpcDefinition(QName rpcType) {
184         checkArgument(rpcType != null, "Rpc name must be supplied.");
185         checkState(schemaProvider != null, "Schema Provider is not available.");
186         SchemaContext ctx = schemaProvider.getSchemaContext();
187         checkState(ctx != null, "YANG Schema Context is not available.");
188         Module module = ctx.findModuleByNamespaceAndRevision(rpcType.getNamespace(), rpcType.getRevision());
189         checkState(module != null, "YANG Module is not available.");
190         return findRpcDefinition(rpcType, module.getRpcs());
191     }
192
193     static private RpcDefinition findRpcDefinition(QName rpcType, Set<RpcDefinition> rpcs) {
194         checkState(rpcs != null, "Rpc schema is not available.");
195         for (RpcDefinition rpc : rpcs) {
196             if (rpcType.equals(rpc.getQName())) {
197                 return rpc;
198             }
199         }
200         throw new IllegalArgumentException("Supplied Rpc Type is not defined.");
201     }
202
203     private RoutingStrategy getRoutingStrategy(RpcDefinition rpc) {
204         ContainerSchemaNode input = rpc.getInput();
205         if (input != null) {
206             for (DataSchemaNode schemaNode : input.getChildNodes()) {
207                 Optional<QName> context = getRoutingContext(schemaNode);
208                 if (context.isPresent()) {
209                     return createRoutedStrategy(rpc, context.get(), schemaNode.getQName());
210                 }
211             }
212         }
213         return createGlobalStrategy(rpc);
214     }
215
216     private static RoutingStrategy createRoutedStrategy(RpcDefinition rpc, QName context, QName leafNode) {
217         return new RoutedRpcStrategy(rpc.getQName(), context, leafNode);
218     }
219
220     private Optional<QName> getRoutingContext(DataSchemaNode schemaNode) {
221         for (UnknownSchemaNode extension : schemaNode.getUnknownSchemaNodes()) {
222             if (CONTEXT_REFERENCE.equals(extension.getNodeType())) {
223                 return Optional.fromNullable(extension.getQName());
224             }
225             ;
226         }
227         return Optional.absent();
228     }
229
230     private static RoutingStrategy createGlobalStrategy(RpcDefinition rpc) {
231         GlobalRpcStrategy ret = new GlobalRpcStrategy(rpc.getQName());
232         return ret;
233     }
234
235     @Override
236     public RpcResult<CompositeNode> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
237       checkState(defaultDelegate != null);
238       return defaultDelegate.invokeRpc(rpc, identifier, input);
239     }
240
241     private static abstract class RoutingStrategy implements Identifiable<QName> {
242
243         private final QName identifier;
244
245         public RoutingStrategy(QName identifier) {
246             super();
247             this.identifier = identifier;
248         }
249
250         @Override
251         public QName getIdentifier() {
252             return identifier;
253         }
254     }
255
256     private static class GlobalRpcStrategy extends RoutingStrategy {
257
258         public GlobalRpcStrategy(QName identifier) {
259             super(identifier);
260         }
261     }
262
263     private static class RoutedRpcStrategy extends RoutingStrategy {
264
265         private final QName context;
266         private final QName leaf;
267
268         public RoutedRpcStrategy(QName identifier, QName ctx, QName leaf) {
269             super(identifier);
270             this.context = ctx;
271             this.leaf = leaf;
272         }
273
274         public QName getContext() {
275             return context;
276         }
277
278         public QName getLeaf() {
279             return leaf;
280         }
281     }
282
283     private static class RoutedRpcSelector implements RpcImplementation, AutoCloseable, Identifiable<QName> {
284
285         private final RoutedRpcStrategy strategy;
286         private final Set<QName> supportedRpcs;
287         private RpcImplementation defaultDelegate;
288         private final ConcurrentMap<InstanceIdentifier, RoutedRpcRegImpl> implementations = new ConcurrentHashMap<>();
289         private SchemaAwareRpcBroker router;
290
291         public RoutedRpcSelector(RoutedRpcStrategy strategy, SchemaAwareRpcBroker router) {
292             super();
293             this.strategy = strategy;
294             supportedRpcs = ImmutableSet.of(strategy.getIdentifier());
295             this.router = router;
296         }
297
298         @Override
299         public QName getIdentifier() {
300             return strategy.getIdentifier();
301         }
302
303         @Override
304         public void close() throws Exception {
305
306         }
307
308         @Override
309         public Set<QName> getSupportedRpcs() {
310             return supportedRpcs;
311         }
312
313         public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
314             return new RoutedRpcRegImpl(rpcType, implementation, this);
315         }
316
317         @Override
318         public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
319             CompositeNode inputContainer = input.getFirstCompositeByName(QName.create(rpc,"input"));
320             checkArgument(inputContainer != null, "Rpc payload must contain input element");
321             SimpleNode<?> routeContainer = inputContainer.getFirstSimpleByName(strategy.getLeaf());
322             checkArgument(routeContainer != null, "Leaf %s must be set with value", strategy.getLeaf());
323             Object route = routeContainer.getValue();
324             checkArgument(route instanceof InstanceIdentifier);
325             RpcImplementation potential = null;
326             if (route != null) {
327                 RoutedRpcRegImpl potentialReg = implementations.get(route);
328                 if (potentialReg != null) {
329                     potential = potentialReg.getInstance();
330                 }
331             }
332             if (potential == null) {
333                 return router.invokeRpc(rpc, (InstanceIdentifier) route, input);
334             }
335             checkState(potential != null, "No implementation is available for rpc:%s path:%s", rpc, route);
336             return potential.invokeRpc(rpc, input);
337         }
338
339         public void addPath(QName context, InstanceIdentifier path, RoutedRpcRegImpl routedRpcRegImpl) {
340             //checkArgument(strategy.getContext().equals(context),"Supplied context is not supported.");
341             RoutedRpcRegImpl previous = implementations.put(path, routedRpcRegImpl);
342             if (previous == null) {
343                 router.notifyPathAnnouncement(context,strategy.getIdentifier(), path);
344             }
345
346         }
347
348         public void removePath(QName context, InstanceIdentifier path, RoutedRpcRegImpl routedRpcRegImpl) {
349             boolean removed = implementations.remove(path, routedRpcRegImpl);
350             if (removed) {
351                 router.notifyPathWithdrawal(context, strategy.getIdentifier(), path);
352             }
353         }
354     }
355
356     private static class GlobalRpcRegistration extends AbstractObjectRegistration<RpcImplementation> implements
357             RpcRegistration {
358         private final QName type;
359         private SchemaAwareRpcBroker router;
360
361         public GlobalRpcRegistration(QName type, RpcImplementation instance, SchemaAwareRpcBroker router) {
362             super(instance);
363             this.type = type;
364             this.router = router;
365         }
366
367         @Override
368         public QName getType() {
369             return type;
370         }
371
372         @Override
373         protected void removeRegistration() {
374             if (router != null) {
375                 router.remove(this);
376                 router = null;
377             }
378         }
379     }
380
381     private static class RoutedRpcRegImpl extends AbstractObjectRegistration<RpcImplementation> implements
382             RoutedRpcRegistration {
383
384         private final QName type;
385         private RoutedRpcSelector router;
386
387         public RoutedRpcRegImpl(QName rpcType, RpcImplementation implementation, RoutedRpcSelector routedRpcSelector) {
388             super(implementation);
389             this.type = rpcType;
390             router = routedRpcSelector;
391         }
392
393         @Override
394         public void registerPath(QName context, InstanceIdentifier path) {
395             router.addPath(context, path, this);
396         }
397
398         @Override
399         public void unregisterPath(QName context, InstanceIdentifier path) {
400             router.removePath(context, path, this);
401         }
402
403         @Override
404         protected void removeRegistration() {
405
406         }
407
408         @Override
409         public QName getType() {
410             return type;
411         }
412
413     }
414
415     private void remove(GlobalRpcRegistration registration) {
416         implementations.remove(registration.getType(), registration);
417     }
418
419     private void notifyPathAnnouncement(QName context, QName identifier, InstanceIdentifier path) {
420         RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
421         RouteChange<RpcRoutingContext, InstanceIdentifier> change = RoutingUtils.announcementChange(contextWrapped , path);
422         for(ListenerRegistration<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeListener : routeChangeListeners) {
423             try {
424                 routeListener.getInstance().onRouteChange(change);
425             } catch (Exception e) {
426                 LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e);
427                 
428             }
429         }
430         
431     }
432
433     
434
435     private void notifyPathWithdrawal(QName context,QName identifier, InstanceIdentifier path) {
436         RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
437         RouteChange<RpcRoutingContext, InstanceIdentifier> change = RoutingUtils.removalChange(contextWrapped , path);
438         for(ListenerRegistration<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeListener : routeChangeListeners) {
439             try {
440                 routeListener.getInstance().onRouteChange(change);
441             } catch (Exception e) {
442                 LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e);
443             }
444         }
445     }
446     
447     @Override
448     public <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> ListenerRegistration<L> registerRouteChangeListener(
449             L listener) {
450         return routeChangeListeners.registerWithType(listener);
451     }
452 }