Merge "Enabling Remote RPC Router module in ODL distribution."
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / sal / dom / broker / impl / SchemaAwareRpcBroker.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.dom.broker.impl;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12
13 import java.util.Set;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.ConcurrentMap;
16
17 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
18 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
19 import org.opendaylight.controller.md.sal.common.impl.routing.RoutingUtils;
20 import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
21 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
22 import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation;
23 import org.opendaylight.controller.sal.core.api.RpcImplementation;
24 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
25 import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
26 import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter;
27 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
28 import org.opendaylight.yangtools.concepts.Identifiable;
29 import org.opendaylight.yangtools.concepts.ListenerRegistration;
30 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
31 import org.opendaylight.yangtools.yang.common.QName;
32 import org.opendaylight.yangtools.yang.common.RpcResult;
33 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
34 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
35 import org.opendaylight.yangtools.yang.data.api.SimpleNode;
36 import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
37 import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
38 import org.opendaylight.yangtools.yang.model.api.Module;
39 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
40 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
41 import org.opendaylight.yangtools.yang.model.api.UnknownSchemaNode;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 import com.google.common.base.Optional;
46 import com.google.common.collect.FluentIterable;
47 import com.google.common.collect.ImmutableMap;
48 import com.google.common.collect.ImmutableSet;
49
50 public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, RoutedRpcDefaultImplementation {
51
52     private static final Logger LOG = LoggerFactory.getLogger(SchemaAwareRpcBroker.class);
53
54     private static final QName CONTEXT_REFERENCE = QName.create("urn:opendaylight:yang:extension:yang-ext",
55             "2013-07-09", "context-reference");
56     private final ListenerRegistry<RpcRegistrationListener> rpcRegistrationListeners = new ListenerRegistry<>();
57     private final ListenerRegistry<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeChangeListeners = new ListenerRegistry<>();
58
59
60     private final String identifier;
61     private final ConcurrentMap<QName, RpcImplementation> implementations = new ConcurrentHashMap<>();
62     private RpcImplementation defaultImplementation;
63     private SchemaContextProvider schemaProvider;
64     private RoutedRpcDefaultImplementation defaultDelegate;
65
66     public SchemaAwareRpcBroker(String identifier, SchemaContextProvider schemaProvider) {
67         super();
68         this.identifier = identifier;
69         this.schemaProvider = schemaProvider;
70     }
71
72     public RpcImplementation getDefaultImplementation() {
73         return defaultImplementation;
74     }
75
76     public void setDefaultImplementation(RpcImplementation defaultImplementation) {
77         this.defaultImplementation = defaultImplementation;
78     }
79
80     public SchemaContextProvider getSchemaProvider() {
81         return schemaProvider;
82     }
83
84     public void setSchemaProvider(SchemaContextProvider schemaProvider) {
85         this.schemaProvider = schemaProvider;
86     }
87
88   public RoutedRpcDefaultImplementation getRoutedRpcDefaultDelegate() {
89     return defaultDelegate;
90   }
91
92     @Override
93   public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultDelegate) {
94     this.defaultDelegate = defaultDelegate;
95   }
96
97   @Override
98     public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
99         checkArgument(rpcType != null, "RPC Type should not be null");
100         checkArgument(implementation != null, "RPC Implementatoin should not be null");
101         return getOrCreateRoutedRpcRouter(rpcType).addRoutedRpcImplementation(rpcType, implementation);
102     }
103
104     private RoutedRpcSelector getOrCreateRoutedRpcRouter(QName rpcType) {
105         RoutedRpcSelector potential = getRoutedRpcRouter(rpcType);
106         if (potential != null) {
107             return potential;
108         }
109         synchronized (implementations) {
110             potential = getRoutedRpcRouter(rpcType);
111             if (potential != null) {
112                 return potential;
113             }
114             RpcDefinition definition = findRpcDefinition(rpcType);
115             RoutingStrategy strategy = getRoutingStrategy(definition);
116             checkState(strategy instanceof RoutedRpcStrategy, "Rpc %s is not routed.", rpcType);
117             potential = new RoutedRpcSelector((RoutedRpcStrategy) strategy, this);
118             implementations.put(rpcType, potential);
119             return potential;
120         }
121     }
122
123     private RoutedRpcSelector getRoutedRpcRouter(QName rpcType) {
124         RpcImplementation potential = implementations.get(rpcType);
125         if (potential != null) {
126             checkState(potential instanceof RoutedRpcSelector, "Rpc %s is not routed.", rpcType);
127             return (RoutedRpcSelector) potential;
128         }
129         return null;
130
131     }
132
133     @Override
134     public RpcRegistration addRpcImplementation(QName rpcType, RpcImplementation implementation)
135             throws IllegalArgumentException {
136         checkArgument(rpcType != null, "RPC Type should not be null");
137         checkArgument(implementation != null, "RPC Implementatoin should not be null");
138         checkState(!hasRpcImplementation(rpcType), "Implementation already registered");
139         RpcDefinition definition = findRpcDefinition(rpcType);
140         checkArgument(!isRoutedRpc(definition), "RPC Type must not be routed.");
141         GlobalRpcRegistration reg = new GlobalRpcRegistration(rpcType, implementation, this);
142         implementations.putIfAbsent(rpcType, implementation);
143         return reg;
144     }
145
146     private boolean isRoutedRpc(RpcDefinition definition) {
147         return getRoutingStrategy(definition) instanceof RoutedRpcStrategy;
148     }
149
150     @Override
151     public ListenerRegistration<RpcRegistrationListener> addRpcRegistrationListener(RpcRegistrationListener listener) {
152         return rpcRegistrationListeners.register(listener);
153     }
154
155     @Override
156     public String getIdentifier() {
157         return identifier;
158     }
159
160     @Override
161     public Set<QName> getSupportedRpcs() {
162         return ImmutableSet.copyOf(implementations.keySet());
163     }
164
165     @Override
166     public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
167         return findRpcImplemention(rpc).invokeRpc(rpc, input);
168     }
169
170     private RpcImplementation findRpcImplemention(QName rpc) {
171         checkArgument(rpc != null, "Rpc name should not be null");
172         RpcImplementation potentialImpl = implementations.get(rpc);
173         if (potentialImpl != null) {
174             return potentialImpl;
175         }
176         potentialImpl = defaultImplementation;
177         checkState(potentialImpl != null, "Implementation is not available.");
178         return potentialImpl;
179     }
180
181     private boolean hasRpcImplementation(QName rpc) {
182         return implementations.containsKey(rpc);
183     }
184
185     private RpcDefinition findRpcDefinition(QName rpcType) {
186         checkArgument(rpcType != null, "Rpc name must be supplied.");
187         checkState(schemaProvider != null, "Schema Provider is not available.");
188         SchemaContext ctx = schemaProvider.getSchemaContext();
189         checkState(ctx != null, "YANG Schema Context is not available.");
190         Module module = ctx.findModuleByNamespaceAndRevision(rpcType.getNamespace(), rpcType.getRevision());
191         checkState(module != null, "YANG Module is not available.");
192         return findRpcDefinition(rpcType, module.getRpcs());
193     }
194
195     static private RpcDefinition findRpcDefinition(QName rpcType, Set<RpcDefinition> rpcs) {
196         checkState(rpcs != null, "Rpc schema is not available.");
197         for (RpcDefinition rpc : rpcs) {
198             if (rpcType.equals(rpc.getQName())) {
199                 return rpc;
200             }
201         }
202         throw new IllegalArgumentException("Supplied Rpc Type is not defined.");
203     }
204
205     private RoutingStrategy getRoutingStrategy(RpcDefinition rpc) {
206         ContainerSchemaNode input = rpc.getInput();
207         if (input != null) {
208             for (DataSchemaNode schemaNode : input.getChildNodes()) {
209                 Optional<QName> context = getRoutingContext(schemaNode);
210                 if (context.isPresent()) {
211                     return createRoutedStrategy(rpc, context.get(), schemaNode.getQName());
212                 }
213             }
214         }
215         return createGlobalStrategy(rpc);
216     }
217
218     private static RoutingStrategy createRoutedStrategy(RpcDefinition rpc, QName context, QName leafNode) {
219         return new RoutedRpcStrategy(rpc.getQName(), context, leafNode);
220     }
221
222     private Optional<QName> getRoutingContext(DataSchemaNode schemaNode) {
223         for (UnknownSchemaNode extension : schemaNode.getUnknownSchemaNodes()) {
224             if (CONTEXT_REFERENCE.equals(extension.getNodeType())) {
225                 return Optional.fromNullable(extension.getQName());
226             }
227             ;
228         }
229         return Optional.absent();
230     }
231
232     private static RoutingStrategy createGlobalStrategy(RpcDefinition rpc) {
233         GlobalRpcStrategy ret = new GlobalRpcStrategy(rpc.getQName());
234         return ret;
235     }
236
237     @Override
238     public RpcResult<CompositeNode> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
239       checkState(defaultDelegate != null);
240       return defaultDelegate.invokeRpc(rpc, identifier, input);
241     }
242
243     private static abstract class RoutingStrategy implements Identifiable<QName> {
244
245         private final QName identifier;
246
247         public RoutingStrategy(QName identifier) {
248             super();
249             this.identifier = identifier;
250         }
251
252         @Override
253         public QName getIdentifier() {
254             return identifier;
255         }
256     }
257
258     private static class GlobalRpcStrategy extends RoutingStrategy {
259
260         public GlobalRpcStrategy(QName identifier) {
261             super(identifier);
262         }
263     }
264
265     private static class RoutedRpcStrategy extends RoutingStrategy {
266
267         private final QName context;
268         private final QName leaf;
269
270         public RoutedRpcStrategy(QName identifier, QName ctx, QName leaf) {
271             super(identifier);
272             this.context = ctx;
273             this.leaf = leaf;
274         }
275
276         public QName getContext() {
277             return context;
278         }
279
280         public QName getLeaf() {
281             return leaf;
282         }
283     }
284
285     private static class RoutedRpcSelector implements RpcImplementation, AutoCloseable, Identifiable<RpcRoutingContext> {
286
287         private final RoutedRpcStrategy strategy;
288         private final Set<QName> supportedRpcs;
289         private final RpcRoutingContext identifier;
290         private RpcImplementation defaultDelegate;
291         private final ConcurrentMap<InstanceIdentifier, RoutedRpcRegImpl> implementations = new ConcurrentHashMap<>();
292         private final SchemaAwareRpcBroker router;
293
294         public RoutedRpcSelector(RoutedRpcStrategy strategy, SchemaAwareRpcBroker router) {
295             super();
296             this.strategy = strategy;
297             supportedRpcs = ImmutableSet.of(strategy.getIdentifier());
298             identifier = RpcRoutingContext.create(strategy.context, strategy.getIdentifier());
299             this.router = router;
300         }
301
302         @Override
303         public RpcRoutingContext getIdentifier() {
304             return identifier;
305         }
306
307         @Override
308         public void close() throws Exception {
309
310         }
311
312         @Override
313         public Set<QName> getSupportedRpcs() {
314             return supportedRpcs;
315         }
316
317         public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
318             return new RoutedRpcRegImpl(rpcType, implementation, this);
319         }
320
321         @Override
322         public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
323             CompositeNode inputContainer = input.getFirstCompositeByName(QName.create(rpc,"input"));
324             checkArgument(inputContainer != null, "Rpc payload must contain input element");
325             SimpleNode<?> routeContainer = inputContainer.getFirstSimpleByName(strategy.getLeaf());
326             checkArgument(routeContainer != null, "Leaf %s must be set with value", strategy.getLeaf());
327             Object route = routeContainer.getValue();
328             checkArgument(route instanceof InstanceIdentifier);
329             RpcImplementation potential = null;
330             if (route != null) {
331                 RoutedRpcRegImpl potentialReg = implementations.get(route);
332                 if (potentialReg != null) {
333                     potential = potentialReg.getInstance();
334                 }
335             }
336             if (potential == null) {
337                 return router.invokeRpc(rpc, (InstanceIdentifier) route, input);
338             }
339             checkState(potential != null, "No implementation is available for rpc:%s path:%s", rpc, route);
340             return potential.invokeRpc(rpc, input);
341         }
342
343         public void addPath(QName context, InstanceIdentifier path, RoutedRpcRegImpl routedRpcRegImpl) {
344             //checkArgument(strategy.getContext().equals(context),"Supplied context is not supported.");
345             RoutedRpcRegImpl previous = implementations.put(path, routedRpcRegImpl);
346             if (previous == null) {
347                 router.notifyPathAnnouncement(context,strategy.getIdentifier(), path);
348             }
349
350         }
351
352         public void removePath(QName context, InstanceIdentifier path, RoutedRpcRegImpl routedRpcRegImpl) {
353             boolean removed = implementations.remove(path, routedRpcRegImpl);
354             if (removed) {
355                 router.notifyPathWithdrawal(context, strategy.getIdentifier(), path);
356             }
357         }
358     }
359
360     private static class GlobalRpcRegistration extends AbstractObjectRegistration<RpcImplementation> implements
361             RpcRegistration {
362         private final QName type;
363         private SchemaAwareRpcBroker router;
364
365         public GlobalRpcRegistration(QName type, RpcImplementation instance, SchemaAwareRpcBroker router) {
366             super(instance);
367             this.type = type;
368             this.router = router;
369         }
370
371         @Override
372         public QName getType() {
373             return type;
374         }
375
376         @Override
377         protected void removeRegistration() {
378             if (router != null) {
379                 router.remove(this);
380                 router = null;
381             }
382         }
383     }
384
385     private static class RoutedRpcRegImpl extends AbstractObjectRegistration<RpcImplementation> implements
386             RoutedRpcRegistration {
387
388         private final QName type;
389         private final RoutedRpcSelector router;
390
391         public RoutedRpcRegImpl(QName rpcType, RpcImplementation implementation, RoutedRpcSelector routedRpcSelector) {
392             super(implementation);
393             this.type = rpcType;
394             router = routedRpcSelector;
395         }
396
397         @Override
398         public void registerPath(QName context, InstanceIdentifier path) {
399             router.addPath(context, path, this);
400         }
401
402         @Override
403         public void unregisterPath(QName context, InstanceIdentifier path) {
404             router.removePath(context, path, this);
405         }
406
407         @Override
408         protected void removeRegistration() {
409
410         }
411
412         @Override
413         public QName getType() {
414             return type;
415         }
416
417     }
418
419     private void remove(GlobalRpcRegistration registration) {
420         implementations.remove(registration.getType(), registration);
421     }
422
423     private void notifyPathAnnouncement(QName context, QName identifier, InstanceIdentifier path) {
424         RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
425         RouteChange<RpcRoutingContext, InstanceIdentifier> change = RoutingUtils.announcementChange(contextWrapped , path);
426         for(ListenerRegistration<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeListener : routeChangeListeners) {
427             try {
428                 routeListener.getInstance().onRouteChange(change);
429             } catch (Exception e) {
430                 LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e);
431
432             }
433         }
434
435     }
436
437
438
439     private void notifyPathWithdrawal(QName context,QName identifier, InstanceIdentifier path) {
440         RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
441         RouteChange<RpcRoutingContext, InstanceIdentifier> change = RoutingUtils.removalChange(contextWrapped , path);
442         for(ListenerRegistration<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeListener : routeChangeListeners) {
443             try {
444                 routeListener.getInstance().onRouteChange(change);
445             } catch (Exception e) {
446                 LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e);
447             }
448         }
449     }
450
451     @Override
452     public <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> ListenerRegistration<L> registerRouteChangeListener(
453             L listener) {
454         ListenerRegistration<L> reg = routeChangeListeners.registerWithType(listener);
455         RouteChange<RpcRoutingContext, InstanceIdentifier> initial = createInitialRouteChange();
456         try {
457         listener.onRouteChange(initial);
458         } catch (Exception e) {
459             LOG.error("Unhandled exception during sending initial route change event {} to {}",initial,listener);
460         }
461         return reg;
462     }
463
464     private RouteChange<RpcRoutingContext, InstanceIdentifier> createInitialRouteChange() {
465         FluentIterable<RoutedRpcSelector> rpcSelectors = FluentIterable.from(implementations.values()).filter(RoutedRpcSelector.class);
466
467
468         ImmutableMap.Builder<RpcRoutingContext, Set<InstanceIdentifier>> announcements = ImmutableMap.builder();
469         ImmutableMap.Builder<RpcRoutingContext, Set<InstanceIdentifier>> removals = ImmutableMap.builder();
470         for (RoutedRpcSelector routedRpcSelector : rpcSelectors) {
471             final RpcRoutingContext context = routedRpcSelector.getIdentifier();
472             final Set<InstanceIdentifier> paths = ImmutableSet.copyOf(routedRpcSelector.implementations.keySet());
473             announcements.put(context, paths);
474         }
475         return RoutingUtils.change(announcements.build(), removals.build());
476     }
477 }