Merge "Fix RPC forwarding related bugs in Binding Independent Connector"
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / sal / dom / broker / impl / SchemaAwareRpcBroker.java
1 package org.opendaylight.controller.sal.dom.broker.impl;
2
3 import static com.google.common.base.Preconditions.checkArgument;
4 import static com.google.common.base.Preconditions.checkState;
5
6 import java.util.Set;
7 import java.util.concurrent.ConcurrentHashMap;
8 import java.util.concurrent.ConcurrentMap;
9
10 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
11 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
12 import org.opendaylight.controller.md.sal.common.impl.routing.RoutingUtils;
13 import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
14 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
15 import org.opendaylight.controller.sal.core.api.RpcImplementation;
16 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
17 import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
18 import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter;
19 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
20 import org.opendaylight.yangtools.concepts.Identifiable;
21 import org.opendaylight.yangtools.concepts.ListenerRegistration;
22 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
23 import org.opendaylight.yangtools.yang.common.QName;
24 import org.opendaylight.yangtools.yang.common.RpcResult;
25 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
26 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
27 import org.opendaylight.yangtools.yang.data.api.SimpleNode;
28 import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
29 import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
30 import org.opendaylight.yangtools.yang.model.api.Module;
31 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
32 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
33 import org.opendaylight.yangtools.yang.model.api.UnknownSchemaNode;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import com.google.common.base.Optional;
38 import com.google.common.collect.ImmutableSet;
39
40 public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String> {
41
42     private static final Logger LOG = LoggerFactory.getLogger(SchemaAwareRpcBroker.class);
43
44     private static final QName CONTEXT_REFERENCE = QName.create("urn:opendaylight:yang:extension:yang-ext",
45             "2013-07-09", "context-reference");
46     private final ListenerRegistry<RpcRegistrationListener> rpcRegistrationListeners = new ListenerRegistry<>();
47     private final ListenerRegistry<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeChangeListeners = new ListenerRegistry<>();
48     
49
50     private final String identifier;
51     private final ConcurrentMap<QName, RpcImplementation> implementations = new ConcurrentHashMap<>();
52     private RpcImplementation defaultImplementation;
53     private SchemaContextProvider schemaProvider;
54
55     public SchemaAwareRpcBroker(String identifier, SchemaContextProvider schemaProvider) {
56         super();
57         this.identifier = identifier;
58         this.schemaProvider = schemaProvider;
59     }
60
61     public RpcImplementation getDefaultImplementation() {
62         return defaultImplementation;
63     }
64
65     public void setDefaultImplementation(RpcImplementation defaultImplementation) {
66         this.defaultImplementation = defaultImplementation;
67     }
68
69     public SchemaContextProvider getSchemaProvider() {
70         return schemaProvider;
71     }
72
73     public void setSchemaProvider(SchemaContextProvider schemaProvider) {
74         this.schemaProvider = schemaProvider;
75     }
76
77     @Override
78     public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
79         checkArgument(rpcType != null, "RPC Type should not be null");
80         checkArgument(implementation != null, "RPC Implementatoin should not be null");
81         return getOrCreateRoutedRpcRouter(rpcType).addRoutedRpcImplementation(rpcType, implementation);
82     }
83
84     private RoutedRpcSelector getOrCreateRoutedRpcRouter(QName rpcType) {
85         RoutedRpcSelector potential = getRoutedRpcRouter(rpcType);
86         if (potential != null) {
87             return potential;
88         }
89         synchronized (implementations) {
90             potential = getRoutedRpcRouter(rpcType);
91             if (potential != null) {
92                 return potential;
93             }
94             RpcDefinition definition = findRpcDefinition(rpcType);
95             RoutingStrategy strategy = getRoutingStrategy(definition);
96             checkState(strategy instanceof RoutedRpcStrategy, "Rpc %s is not routed.", rpcType);
97             potential = new RoutedRpcSelector((RoutedRpcStrategy) strategy, this);
98             implementations.put(rpcType, potential);
99             return potential;
100         }
101     }
102
103     private RoutedRpcSelector getRoutedRpcRouter(QName rpcType) {
104         RpcImplementation potential = implementations.get(rpcType);
105         if (potential != null) {
106             checkState(potential instanceof RoutedRpcSelector, "Rpc %s is not routed.", rpcType);
107             return (RoutedRpcSelector) potential;
108         }
109         return null;
110
111     }
112
113     @Override
114     public RpcRegistration addRpcImplementation(QName rpcType, RpcImplementation implementation)
115             throws IllegalArgumentException {
116         checkArgument(rpcType != null, "RPC Type should not be null");
117         checkArgument(implementation != null, "RPC Implementatoin should not be null");
118         checkState(!hasRpcImplementation(rpcType), "Implementation already registered");
119         RpcDefinition definition = findRpcDefinition(rpcType);
120         checkArgument(!isRoutedRpc(definition), "RPC Type must not be routed.");
121         GlobalRpcRegistration reg = new GlobalRpcRegistration(rpcType, implementation, this);
122         implementations.putIfAbsent(rpcType, implementation);
123         return reg;
124     }
125
126     private boolean isRoutedRpc(RpcDefinition definition) {
127         return getRoutingStrategy(definition) instanceof RoutedRpcStrategy;
128     }
129
130     @Override
131     public ListenerRegistration<RpcRegistrationListener> addRpcRegistrationListener(RpcRegistrationListener listener) {
132         return rpcRegistrationListeners.register(listener);
133     }
134
135     @Override
136     public String getIdentifier() {
137         return identifier;
138     }
139
140     @Override
141     public Set<QName> getSupportedRpcs() {
142         return ImmutableSet.copyOf(implementations.keySet());
143     }
144
145     @Override
146     public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
147         return findRpcImplemention(rpc).invokeRpc(rpc, input);
148     }
149
150     private RpcImplementation findRpcImplemention(QName rpc) {
151         checkArgument(rpc != null, "Rpc name should not be null");
152         RpcImplementation potentialImpl = implementations.get(rpc);
153         if (potentialImpl != null) {
154             return potentialImpl;
155         }
156         potentialImpl = defaultImplementation;
157         checkState(potentialImpl != null, "Implementation is not available.");
158         return potentialImpl;
159     }
160
161     private boolean hasRpcImplementation(QName rpc) {
162         return implementations.containsKey(rpc);
163     }
164
165     private RpcDefinition findRpcDefinition(QName rpcType) {
166         checkArgument(rpcType != null, "Rpc name must be supplied.");
167         checkState(schemaProvider != null, "Schema Provider is not available.");
168         SchemaContext ctx = schemaProvider.getSchemaContext();
169         checkState(ctx != null, "YANG Schema Context is not available.");
170         Module module = ctx.findModuleByNamespaceAndRevision(rpcType.getNamespace(), rpcType.getRevision());
171         checkState(module != null, "YANG Module is not available.");
172         return findRpcDefinition(rpcType, module.getRpcs());
173     }
174
175     static private RpcDefinition findRpcDefinition(QName rpcType, Set<RpcDefinition> rpcs) {
176         checkState(rpcs != null, "Rpc schema is not available.");
177         for (RpcDefinition rpc : rpcs) {
178             if (rpcType.equals(rpc.getQName())) {
179                 return rpc;
180             }
181         }
182         throw new IllegalArgumentException("Supplied Rpc Type is not defined.");
183     }
184
185     private RoutingStrategy getRoutingStrategy(RpcDefinition rpc) {
186         ContainerSchemaNode input = rpc.getInput();
187         if (input != null) {
188             for (DataSchemaNode schemaNode : input.getChildNodes()) {
189                 Optional<QName> context = getRoutingContext(schemaNode);
190                 if (context.isPresent()) {
191                     return createRoutedStrategy(rpc, context.get(), schemaNode.getQName());
192                 }
193             }
194         }
195         return createGlobalStrategy(rpc);
196     }
197
198     private static RoutingStrategy createRoutedStrategy(RpcDefinition rpc, QName context, QName leafNode) {
199         return new RoutedRpcStrategy(rpc.getQName(), context, leafNode);
200     }
201
202     private Optional<QName> getRoutingContext(DataSchemaNode schemaNode) {
203         for (UnknownSchemaNode extension : schemaNode.getUnknownSchemaNodes()) {
204             if (CONTEXT_REFERENCE.equals(extension.getNodeType())) {
205                 return Optional.fromNullable(extension.getQName());
206             }
207             ;
208         }
209         return Optional.absent();
210     }
211
212     private static RoutingStrategy createGlobalStrategy(RpcDefinition rpc) {
213         GlobalRpcStrategy ret = new GlobalRpcStrategy(rpc.getQName());
214         return ret;
215     }
216
217     private static abstract class RoutingStrategy implements Identifiable<QName> {
218
219         private final QName identifier;
220
221         public RoutingStrategy(QName identifier) {
222             super();
223             this.identifier = identifier;
224         }
225
226         @Override
227         public QName getIdentifier() {
228             return identifier;
229         }
230     }
231
232     private static class GlobalRpcStrategy extends RoutingStrategy {
233
234         public GlobalRpcStrategy(QName identifier) {
235             super(identifier);
236         }
237     }
238
239     private static class RoutedRpcStrategy extends RoutingStrategy {
240
241         private final QName context;
242         private final QName leaf;
243
244         public RoutedRpcStrategy(QName identifier, QName ctx, QName leaf) {
245             super(identifier);
246             this.context = ctx;
247             this.leaf = leaf;
248         }
249
250         public QName getContext() {
251             return context;
252         }
253
254         public QName getLeaf() {
255             return leaf;
256         }
257     }
258
259     private static class RoutedRpcSelector implements RpcImplementation, AutoCloseable, Identifiable<QName> {
260
261         private final RoutedRpcStrategy strategy;
262         private final Set<QName> supportedRpcs;
263         private RpcImplementation defaultDelegate;
264         private final ConcurrentMap<InstanceIdentifier, RoutedRpcRegImpl> implementations = new ConcurrentHashMap<>();
265         private SchemaAwareRpcBroker router;
266
267         public RoutedRpcSelector(RoutedRpcStrategy strategy, SchemaAwareRpcBroker router) {
268             super();
269             this.strategy = strategy;
270             supportedRpcs = ImmutableSet.of(strategy.getIdentifier());
271             this.router = router;
272         }
273
274         @Override
275         public QName getIdentifier() {
276             return strategy.getIdentifier();
277         }
278
279         @Override
280         public void close() throws Exception {
281
282         }
283
284         @Override
285         public Set<QName> getSupportedRpcs() {
286             return supportedRpcs;
287         }
288
289         public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
290             return new RoutedRpcRegImpl(rpcType, implementation, this);
291         }
292
293         @Override
294         public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
295             CompositeNode inputContainer = input.getFirstCompositeByName(QName.create(rpc,"input"));
296             checkArgument(inputContainer != null, "Rpc payload must contain input element");
297             SimpleNode<?> routeContainer = inputContainer.getFirstSimpleByName(strategy.getLeaf());
298             checkArgument(routeContainer != null, "Leaf %s must be set with value", strategy.getLeaf());
299             Object route = routeContainer.getValue();
300             RpcImplementation potential = null;
301             if (route != null) {
302                 RoutedRpcRegImpl potentialReg = implementations.get(route);
303                 if (potentialReg != null) {
304                     potential = potentialReg.getInstance();
305                 }
306             }
307             if (potential == null) {
308                 potential = defaultDelegate;
309             }
310             checkState(potential != null, "No implementation is available for rpc:%s path:%s", rpc, route);
311             return potential.invokeRpc(rpc, input);
312         }
313
314         public void addPath(QName context, InstanceIdentifier path, RoutedRpcRegImpl routedRpcRegImpl) {
315             //checkArgument(strategy.getContext().equals(context),"Supplied context is not supported.");
316             RoutedRpcRegImpl previous = implementations.put(path, routedRpcRegImpl);
317             if (previous == null) {
318                 router.notifyPathAnnouncement(context,strategy.getIdentifier(), path);
319             }
320
321         }
322
323         public void removePath(QName context, InstanceIdentifier path, RoutedRpcRegImpl routedRpcRegImpl) {
324             boolean removed = implementations.remove(path, routedRpcRegImpl);
325             if (removed) {
326                 router.notifyPathWithdrawal(context, strategy.getIdentifier(), path);
327             }
328         }
329     }
330
331     private static class GlobalRpcRegistration extends AbstractObjectRegistration<RpcImplementation> implements
332             RpcRegistration {
333         private final QName type;
334         private SchemaAwareRpcBroker router;
335
336         public GlobalRpcRegistration(QName type, RpcImplementation instance, SchemaAwareRpcBroker router) {
337             super(instance);
338             this.type = type;
339             this.router = router;
340         }
341
342         @Override
343         public QName getType() {
344             return type;
345         }
346
347         @Override
348         protected void removeRegistration() {
349             if (router != null) {
350                 router.remove(this);
351                 router = null;
352             }
353         }
354     }
355
356     private static class RoutedRpcRegImpl extends AbstractObjectRegistration<RpcImplementation> implements
357             RoutedRpcRegistration {
358
359         private final QName type;
360         private RoutedRpcSelector router;
361
362         public RoutedRpcRegImpl(QName rpcType, RpcImplementation implementation, RoutedRpcSelector routedRpcSelector) {
363             super(implementation);
364             this.type = rpcType;
365             router = routedRpcSelector;
366         }
367
368         @Override
369         public void registerPath(QName context, InstanceIdentifier path) {
370             router.addPath(context, path, this);
371         }
372
373         @Override
374         public void unregisterPath(QName context, InstanceIdentifier path) {
375             router.removePath(context, path, this);
376         }
377
378         @Override
379         protected void removeRegistration() {
380
381         }
382
383         @Override
384         public QName getType() {
385             return type;
386         }
387
388     }
389
390     private void remove(GlobalRpcRegistration registration) {
391         implementations.remove(registration.getType(), registration);
392     }
393
394     private void notifyPathAnnouncement(QName context, QName identifier, InstanceIdentifier path) {
395         RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
396         RouteChange<RpcRoutingContext, InstanceIdentifier> change = RoutingUtils.announcementChange(contextWrapped , path);
397         for(ListenerRegistration<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeListener : routeChangeListeners) {
398             try {
399                 routeListener.getInstance().onRouteChange(change);
400             } catch (Exception e) {
401                 LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e);
402                 
403             }
404         }
405         
406     }
407
408     
409
410     private void notifyPathWithdrawal(QName context,QName identifier, InstanceIdentifier path) {
411         RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
412         RouteChange<RpcRoutingContext, InstanceIdentifier> change = RoutingUtils.removalChange(contextWrapped , path);
413         for(ListenerRegistration<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeListener : routeChangeListeners) {
414             try {
415                 routeListener.getInstance().onRouteChange(change);
416             } catch (Exception e) {
417                 LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e);
418             }
419         }
420     }
421     
422     @Override
423     public <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> ListenerRegistration<L> registerRouteChangeListener(
424             L listener) {
425         return routeChangeListeners.registerWithType(listener);
426     }
427 }