Merge "Initial mdsal and config karaf features"
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / sal / dom / broker / impl / SchemaAwareRpcBroker.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.dom.broker.impl;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12
13 import java.util.Set;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.ConcurrentMap;
16
17 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
18 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
19 import org.opendaylight.controller.md.sal.common.impl.routing.RoutingUtils;
20 import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
21 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
22 import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation;
23 import org.opendaylight.controller.sal.core.api.RpcImplementation;
24 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
25 import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
26 import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter;
27 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
28 import org.opendaylight.yangtools.concepts.Identifiable;
29 import org.opendaylight.yangtools.concepts.ListenerRegistration;
30 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
31 import org.opendaylight.yangtools.yang.common.QName;
32 import org.opendaylight.yangtools.yang.common.RpcResult;
33 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
34 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
35 import org.opendaylight.yangtools.yang.data.api.SimpleNode;
36 import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
37 import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
38 import org.opendaylight.yangtools.yang.model.api.Module;
39 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
40 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
41 import org.opendaylight.yangtools.yang.model.api.UnknownSchemaNode;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 import com.google.common.base.Optional;
46 import com.google.common.collect.FluentIterable;
47 import com.google.common.collect.ImmutableMap;
48 import com.google.common.collect.ImmutableSet;
49 import com.google.common.util.concurrent.ListenableFuture;
50
51 public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, RoutedRpcDefaultImplementation {
52
53     private static final Logger LOG = LoggerFactory.getLogger(SchemaAwareRpcBroker.class);
54
55     private static final QName CONTEXT_REFERENCE = QName.create("urn:opendaylight:yang:extension:yang-ext",
56             "2013-07-09", "context-reference");
57     private final ListenerRegistry<RpcRegistrationListener> rpcRegistrationListeners = new ListenerRegistry<>();
58     private final ListenerRegistry<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeChangeListeners = new ListenerRegistry<>();
59
60
61     private final String identifier;
62     private final ConcurrentMap<QName, RpcImplementation> implementations = new ConcurrentHashMap<>();
63     private RpcImplementation defaultImplementation;
64     private SchemaContextProvider schemaProvider;
65     private RoutedRpcDefaultImplementation defaultDelegate;
66
67     public SchemaAwareRpcBroker(String identifier, SchemaContextProvider schemaProvider) {
68         super();
69         this.identifier = identifier;
70         this.schemaProvider = schemaProvider;
71     }
72
73     public RpcImplementation getDefaultImplementation() {
74         return defaultImplementation;
75     }
76
77     public void setDefaultImplementation(RpcImplementation defaultImplementation) {
78         this.defaultImplementation = defaultImplementation;
79     }
80
81     public SchemaContextProvider getSchemaProvider() {
82         return schemaProvider;
83     }
84
85     public void setSchemaProvider(SchemaContextProvider schemaProvider) {
86         this.schemaProvider = schemaProvider;
87     }
88
89     public RoutedRpcDefaultImplementation getRoutedRpcDefaultDelegate() {
90         return defaultDelegate;
91     }
92
93     @Override
94     public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultDelegate) {
95         this.defaultDelegate = defaultDelegate;
96     }
97
98     @Override
99     public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
100         checkArgument(rpcType != null, "RPC Type should not be null");
101         checkArgument(implementation != null, "RPC Implementatoin should not be null");
102         return getOrCreateRoutedRpcRouter(rpcType).addRoutedRpcImplementation(rpcType, implementation);
103     }
104
105     private RoutedRpcSelector getOrCreateRoutedRpcRouter(QName rpcType) {
106         RoutedRpcSelector potential = getRoutedRpcRouter(rpcType);
107         if (potential != null) {
108             return potential;
109         }
110         synchronized (implementations) {
111             potential = getRoutedRpcRouter(rpcType);
112             if (potential != null) {
113                 return potential;
114             }
115             RpcDefinition definition = findRpcDefinition(rpcType);
116             RoutingStrategy strategy = getRoutingStrategy(definition);
117             checkState(strategy instanceof RoutedRpcStrategy, "Rpc %s is not routed.", rpcType);
118             potential = new RoutedRpcSelector((RoutedRpcStrategy) strategy, this);
119             implementations.put(rpcType, potential);
120             return potential;
121         }
122     }
123
124     private RoutedRpcSelector getRoutedRpcRouter(QName rpcType) {
125         RpcImplementation potential = implementations.get(rpcType);
126         if (potential != null) {
127             checkState(potential instanceof RoutedRpcSelector, "Rpc %s is not routed.", rpcType);
128             return (RoutedRpcSelector) potential;
129         }
130         return null;
131
132     }
133
134     @Override
135     public RpcRegistration addRpcImplementation(QName rpcType, RpcImplementation implementation)
136             throws IllegalArgumentException {
137         checkArgument(rpcType != null, "RPC Type should not be null");
138         checkArgument(implementation != null, "RPC Implementatoin should not be null");
139         checkState(!hasRpcImplementation(rpcType), "Implementation already registered");
140         RpcDefinition definition = findRpcDefinition(rpcType);
141         checkArgument(!isRoutedRpc(definition), "RPC Type must not be routed.");
142         GlobalRpcRegistration reg = new GlobalRpcRegistration(rpcType, implementation, this);
143         implementations.putIfAbsent(rpcType, implementation);
144         return reg;
145     }
146
147     private boolean isRoutedRpc(RpcDefinition definition) {
148         return getRoutingStrategy(definition) instanceof RoutedRpcStrategy;
149     }
150
151     @Override
152     public ListenerRegistration<RpcRegistrationListener> addRpcRegistrationListener(RpcRegistrationListener listener) {
153         return rpcRegistrationListeners.register(listener);
154     }
155
156     @Override
157     public String getIdentifier() {
158         return identifier;
159     }
160
161     @Override
162     public Set<QName> getSupportedRpcs() {
163         return ImmutableSet.copyOf(implementations.keySet());
164     }
165
166     @Override
167     public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
168         return findRpcImplemention(rpc).invokeRpc(rpc, input);
169     }
170
171     private RpcImplementation findRpcImplemention(QName rpc) {
172         checkArgument(rpc != null, "Rpc name should not be null");
173         RpcImplementation potentialImpl = implementations.get(rpc);
174         if (potentialImpl != null) {
175             return potentialImpl;
176         }
177         potentialImpl = defaultImplementation;
178         checkState(potentialImpl != null, "Implementation is not available.");
179         return potentialImpl;
180     }
181
182     private boolean hasRpcImplementation(QName rpc) {
183         return implementations.containsKey(rpc);
184     }
185
186     private RpcDefinition findRpcDefinition(QName rpcType) {
187         checkArgument(rpcType != null, "Rpc name must be supplied.");
188         checkState(schemaProvider != null, "Schema Provider is not available.");
189         SchemaContext ctx = schemaProvider.getSchemaContext();
190         checkState(ctx != null, "YANG Schema Context is not available.");
191         Module module = ctx.findModuleByNamespaceAndRevision(rpcType.getNamespace(), rpcType.getRevision());
192         checkState(module != null, "YANG Module is not available.");
193         return findRpcDefinition(rpcType, module.getRpcs());
194     }
195
196     static private RpcDefinition findRpcDefinition(QName rpcType, Set<RpcDefinition> rpcs) {
197         checkState(rpcs != null, "Rpc schema is not available.");
198         for (RpcDefinition rpc : rpcs) {
199             if (rpcType.equals(rpc.getQName())) {
200                 return rpc;
201             }
202         }
203         throw new IllegalArgumentException("Supplied Rpc Type is not defined.");
204     }
205
206     private RoutingStrategy getRoutingStrategy(RpcDefinition rpc) {
207         ContainerSchemaNode input = rpc.getInput();
208         if (input != null) {
209             for (DataSchemaNode schemaNode : input.getChildNodes()) {
210                 Optional<QName> context = getRoutingContext(schemaNode);
211                 if (context.isPresent()) {
212                     return createRoutedStrategy(rpc, context.get(), schemaNode.getQName());
213                 }
214             }
215         }
216         return createGlobalStrategy(rpc);
217     }
218
219     private static RoutingStrategy createRoutedStrategy(RpcDefinition rpc, QName context, QName leafNode) {
220         return new RoutedRpcStrategy(rpc.getQName(), context, leafNode);
221     }
222
223     private Optional<QName> getRoutingContext(DataSchemaNode schemaNode) {
224         for (UnknownSchemaNode extension : schemaNode.getUnknownSchemaNodes()) {
225             if (CONTEXT_REFERENCE.equals(extension.getNodeType())) {
226                 return Optional.fromNullable(extension.getQName());
227             }
228             ;
229         }
230         return Optional.absent();
231     }
232
233     private static RoutingStrategy createGlobalStrategy(RpcDefinition rpc) {
234         GlobalRpcStrategy ret = new GlobalRpcStrategy(rpc.getQName());
235         return ret;
236     }
237
238     @Override
239     public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
240       checkState(defaultDelegate != null);
241       return defaultDelegate.invokeRpc(rpc, identifier, input);
242     }
243
244     private static abstract class RoutingStrategy implements Identifiable<QName> {
245
246         private final QName identifier;
247
248         public RoutingStrategy(QName identifier) {
249             super();
250             this.identifier = identifier;
251         }
252
253         @Override
254         public QName getIdentifier() {
255             return identifier;
256         }
257     }
258
259     private static class GlobalRpcStrategy extends RoutingStrategy {
260
261         public GlobalRpcStrategy(QName identifier) {
262             super(identifier);
263         }
264     }
265
266     private static class RoutedRpcStrategy extends RoutingStrategy {
267
268         private final QName context;
269         private final QName leaf;
270
271         public RoutedRpcStrategy(QName identifier, QName ctx, QName leaf) {
272             super(identifier);
273             this.context = ctx;
274             this.leaf = leaf;
275         }
276
277         public QName getContext() {
278             return context;
279         }
280
281         public QName getLeaf() {
282             return leaf;
283         }
284     }
285
286     private static class RoutedRpcSelector implements RpcImplementation, AutoCloseable, Identifiable<RpcRoutingContext> {
287
288         private final RoutedRpcStrategy strategy;
289         private final Set<QName> supportedRpcs;
290         private final RpcRoutingContext identifier;
291         private RpcImplementation defaultDelegate;
292         private final ConcurrentMap<InstanceIdentifier, RoutedRpcRegImpl> implementations = new ConcurrentHashMap<>();
293         private final SchemaAwareRpcBroker router;
294
295         public RoutedRpcSelector(RoutedRpcStrategy strategy, SchemaAwareRpcBroker router) {
296             super();
297             this.strategy = strategy;
298             supportedRpcs = ImmutableSet.of(strategy.getIdentifier());
299             identifier = RpcRoutingContext.create(strategy.context, strategy.getIdentifier());
300             this.router = router;
301         }
302
303         @Override
304         public RpcRoutingContext getIdentifier() {
305             return identifier;
306         }
307
308         @Override
309         public void close() throws Exception {
310
311         }
312
313         @Override
314         public Set<QName> getSupportedRpcs() {
315             return supportedRpcs;
316         }
317
318         public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
319             return new RoutedRpcRegImpl(rpcType, implementation, this);
320         }
321
322         @Override
323         public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
324             CompositeNode inputContainer = input.getFirstCompositeByName(QName.create(rpc,"input"));
325             checkArgument(inputContainer != null, "Rpc payload must contain input element");
326             SimpleNode<?> routeContainer = inputContainer.getFirstSimpleByName(strategy.getLeaf());
327             checkArgument(routeContainer != null, "Leaf %s must be set with value", strategy.getLeaf());
328             Object route = routeContainer.getValue();
329             checkArgument(route instanceof InstanceIdentifier);
330             RpcImplementation potential = null;
331             if (route != null) {
332                 RoutedRpcRegImpl potentialReg = implementations.get(route);
333                 if (potentialReg != null) {
334                     potential = potentialReg.getInstance();
335                 }
336             }
337             if (potential == null) {
338                 return router.invokeRpc(rpc, (InstanceIdentifier) route, input);
339             }
340             checkState(potential != null, "No implementation is available for rpc:%s path:%s", rpc, route);
341             return potential.invokeRpc(rpc, input);
342         }
343
344         public void addPath(QName context, InstanceIdentifier path, RoutedRpcRegImpl routedRpcRegImpl) {
345             //checkArgument(strategy.getContext().equals(context),"Supplied context is not supported.");
346             RoutedRpcRegImpl previous = implementations.put(path, routedRpcRegImpl);
347             if (previous == null) {
348                 router.notifyPathAnnouncement(context,strategy.getIdentifier(), path);
349             }
350
351         }
352
353         public void removePath(QName context, InstanceIdentifier path, RoutedRpcRegImpl routedRpcRegImpl) {
354             boolean removed = implementations.remove(path, routedRpcRegImpl);
355             if (removed) {
356                 router.notifyPathWithdrawal(context, strategy.getIdentifier(), path);
357             }
358         }
359     }
360
361     private static class GlobalRpcRegistration extends AbstractObjectRegistration<RpcImplementation> implements
362             RpcRegistration {
363         private final QName type;
364         private SchemaAwareRpcBroker router;
365
366         public GlobalRpcRegistration(QName type, RpcImplementation instance, SchemaAwareRpcBroker router) {
367             super(instance);
368             this.type = type;
369             this.router = router;
370         }
371
372         @Override
373         public QName getType() {
374             return type;
375         }
376
377         @Override
378         protected void removeRegistration() {
379             if (router != null) {
380                 router.remove(this);
381                 router = null;
382             }
383         }
384     }
385
386     private static class RoutedRpcRegImpl extends AbstractObjectRegistration<RpcImplementation> implements
387             RoutedRpcRegistration {
388
389         private final QName type;
390         private final RoutedRpcSelector router;
391
392         public RoutedRpcRegImpl(QName rpcType, RpcImplementation implementation, RoutedRpcSelector routedRpcSelector) {
393             super(implementation);
394             this.type = rpcType;
395             router = routedRpcSelector;
396         }
397
398         @Override
399         public void registerPath(QName context, InstanceIdentifier path) {
400             router.addPath(context, path, this);
401         }
402
403         @Override
404         public void unregisterPath(QName context, InstanceIdentifier path) {
405             router.removePath(context, path, this);
406         }
407
408         @Override
409         protected void removeRegistration() {
410
411         }
412
413         @Override
414         public QName getType() {
415             return type;
416         }
417
418     }
419
420     private void remove(GlobalRpcRegistration registration) {
421         implementations.remove(registration.getType(), registration);
422     }
423
424     private void notifyPathAnnouncement(QName context, QName identifier, InstanceIdentifier path) {
425         RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
426         RouteChange<RpcRoutingContext, InstanceIdentifier> change = RoutingUtils.announcementChange(contextWrapped , path);
427         for(ListenerRegistration<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeListener : routeChangeListeners) {
428             try {
429                 routeListener.getInstance().onRouteChange(change);
430             } catch (Exception e) {
431                 LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e);
432
433             }
434         }
435
436     }
437
438
439
440     private void notifyPathWithdrawal(QName context,QName identifier, InstanceIdentifier path) {
441         RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
442         RouteChange<RpcRoutingContext, InstanceIdentifier> change = RoutingUtils.removalChange(contextWrapped , path);
443         for(ListenerRegistration<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeListener : routeChangeListeners) {
444             try {
445                 routeListener.getInstance().onRouteChange(change);
446             } catch (Exception e) {
447                 LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e);
448             }
449         }
450     }
451
452     @Override
453     public <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> ListenerRegistration<L> registerRouteChangeListener(
454             L listener) {
455         ListenerRegistration<L> reg = routeChangeListeners.registerWithType(listener);
456         RouteChange<RpcRoutingContext, InstanceIdentifier> initial = createInitialRouteChange();
457         try {
458         listener.onRouteChange(initial);
459         } catch (Exception e) {
460             LOG.error("Unhandled exception during sending initial route change event {} to {}",initial,listener);
461         }
462         return reg;
463     }
464
465     private RouteChange<RpcRoutingContext, InstanceIdentifier> createInitialRouteChange() {
466         FluentIterable<RoutedRpcSelector> rpcSelectors = FluentIterable.from(implementations.values()).filter(RoutedRpcSelector.class);
467
468
469         ImmutableMap.Builder<RpcRoutingContext, Set<InstanceIdentifier>> announcements = ImmutableMap.builder();
470         ImmutableMap.Builder<RpcRoutingContext, Set<InstanceIdentifier>> removals = ImmutableMap.builder();
471         for (RoutedRpcSelector routedRpcSelector : rpcSelectors) {
472             final RpcRoutingContext context = routedRpcSelector.getIdentifier();
473             final Set<InstanceIdentifier> paths = ImmutableSet.copyOf(routedRpcSelector.implementations.keySet());
474             announcements.put(context, paths);
475         }
476         return RoutingUtils.change(announcements.build(), removals.build());
477     }
478 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.