Merge "Avoiding hash collisions of a match with its reverse"
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / sal / dom / broker / impl / SchemaAwareRpcBroker.java
1 package org.opendaylight.controller.sal.dom.broker.impl;
2
3 import static com.google.common.base.Preconditions.checkArgument;
4 import static com.google.common.base.Preconditions.checkState;
5
6 import java.util.Set;
7 import java.util.concurrent.ConcurrentHashMap;
8 import java.util.concurrent.ConcurrentMap;
9
10 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
11 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
12 import org.opendaylight.controller.md.sal.common.impl.routing.RoutingUtils;
13 import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
14 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
15 import org.opendaylight.controller.sal.core.api.RpcImplementation;
16 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
17 import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
18 import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter;
19 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
20 import org.opendaylight.yangtools.concepts.Identifiable;
21 import org.opendaylight.yangtools.concepts.ListenerRegistration;
22 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
23 import org.opendaylight.yangtools.yang.common.QName;
24 import org.opendaylight.yangtools.yang.common.RpcResult;
25 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
26 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
27 import org.opendaylight.yangtools.yang.data.api.SimpleNode;
28 import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
29 import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
30 import org.opendaylight.yangtools.yang.model.api.Module;
31 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
32 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
33 import org.opendaylight.yangtools.yang.model.api.UnknownSchemaNode;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import com.google.common.base.Optional;
38 import com.google.common.collect.ImmutableSet;
39
40 public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String> {
41
42     private static final Logger LOG = LoggerFactory.getLogger(SchemaAwareRpcBroker.class);
43
44     private static final QName CONTEXT_REFERENCE = QName.create("urn:opendaylight:yang:extension:yang-ext",
45             "2013-07-09", "context-reference");
46     private final ListenerRegistry<RpcRegistrationListener> rpcRegistrationListeners = new ListenerRegistry<>();
47     private final ListenerRegistry<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeChangeListeners = new ListenerRegistry<>();
48     
49
50     private final String identifier;
51     private final ConcurrentMap<QName, RpcImplementation> implementations = new ConcurrentHashMap<>();
52     private RpcImplementation defaultImplementation;
53     private SchemaContextProvider schemaProvider;
54
55     public SchemaAwareRpcBroker(String identifier, SchemaContextProvider schemaProvider) {
56         super();
57         this.identifier = identifier;
58         this.schemaProvider = schemaProvider;
59     }
60
61     public RpcImplementation getDefaultImplementation() {
62         return defaultImplementation;
63     }
64
65     public void setDefaultImplementation(RpcImplementation defaultImplementation) {
66         this.defaultImplementation = defaultImplementation;
67     }
68
69     public SchemaContextProvider getSchemaProvider() {
70         return schemaProvider;
71     }
72
73     public void setSchemaProvider(SchemaContextProvider schemaProvider) {
74         this.schemaProvider = schemaProvider;
75     }
76
77     @Override
78     public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
79         checkArgument(rpcType != null, "RPC Type should not be null");
80         checkArgument(implementation != null, "RPC Implementatoin should not be null");
81         return getOrCreateRoutedRpcRouter(rpcType).addRoutedRpcImplementation(rpcType, implementation);
82     }
83
84     private RoutedRpcSelector getOrCreateRoutedRpcRouter(QName rpcType) {
85         RoutedRpcSelector potential = getRoutedRpcRouter(rpcType);
86         if (potential != null) {
87             return potential;
88         }
89         synchronized (implementations) {
90             potential = getRoutedRpcRouter(rpcType);
91             if (potential != null) {
92                 return potential;
93             }
94             RpcDefinition definition = findRpcDefinition(rpcType);
95             RoutingStrategy strategy = getRoutingStrategy(definition);
96             checkState(strategy instanceof RoutedRpcStrategy, "Rpc %s is not routed.", rpcType);
97             potential = new RoutedRpcSelector((RoutedRpcStrategy) strategy, this);
98             implementations.put(rpcType, potential);
99             return potential;
100         }
101     }
102
103     private RoutedRpcSelector getRoutedRpcRouter(QName rpcType) {
104         RpcImplementation potential = implementations.get(rpcType);
105         if (potential != null) {
106             checkState(potential instanceof RoutedRpcSelector, "Rpc %s is not routed.", rpcType);
107             return (RoutedRpcSelector) potential;
108         }
109         return null;
110
111     }
112
113     @Override
114     public RpcRegistration addRpcImplementation(QName rpcType, RpcImplementation implementation)
115             throws IllegalArgumentException {
116         checkArgument(rpcType != null, "RPC Type should not be null");
117         checkArgument(implementation != null, "RPC Implementatoin should not be null");
118         checkState(!hasRpcImplementation(rpcType), "Implementation already registered");
119         RpcDefinition definition = findRpcDefinition(rpcType);
120         checkArgument(!isRoutedRpc(definition), "RPC Type must not be routed.");
121         GlobalRpcRegistration reg = new GlobalRpcRegistration(rpcType, implementation, this);
122         return reg;
123     }
124
125     private boolean isRoutedRpc(RpcDefinition definition) {
126         return getRoutingStrategy(definition) instanceof RoutedRpcStrategy;
127     }
128
129     @Override
130     public ListenerRegistration<RpcRegistrationListener> addRpcRegistrationListener(RpcRegistrationListener listener) {
131         return rpcRegistrationListeners.register(listener);
132     }
133
134     @Override
135     public String getIdentifier() {
136         return identifier;
137     }
138
139     @Override
140     public Set<QName> getSupportedRpcs() {
141         return ImmutableSet.copyOf(implementations.keySet());
142     }
143
144     @Override
145     public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
146         return findRpcImplemention(rpc).invokeRpc(rpc, input);
147     }
148
149     private RpcImplementation findRpcImplemention(QName rpc) {
150         checkArgument(rpc != null, "Rpc name should not be null");
151         RpcImplementation potentialImpl = implementations.get(rpc);
152         if (potentialImpl != null) {
153             return potentialImpl;
154         }
155         potentialImpl = defaultImplementation;
156         checkState(potentialImpl != null, "Implementation is not available.");
157         return potentialImpl;
158     }
159
160     private boolean hasRpcImplementation(QName rpc) {
161         return implementations.containsKey(rpc);
162     }
163
164     private RpcDefinition findRpcDefinition(QName rpcType) {
165         checkArgument(rpcType != null, "Rpc name must be supplied.");
166         checkState(schemaProvider != null, "Schema Provider is not available.");
167         SchemaContext ctx = schemaProvider.getSchemaContext();
168         checkState(ctx != null, "YANG Schema Context is not available.");
169         Module module = ctx.findModuleByNamespaceAndRevision(rpcType.getNamespace(), rpcType.getRevision());
170         checkState(module != null, "YANG Module is not available.");
171         return findRpcDefinition(rpcType, module.getRpcs());
172     }
173
174     static private RpcDefinition findRpcDefinition(QName rpcType, Set<RpcDefinition> rpcs) {
175         checkState(rpcs != null, "Rpc schema is not available.");
176         for (RpcDefinition rpc : rpcs) {
177             if (rpcType.equals(rpc.getQName())) {
178                 return rpc;
179             }
180         }
181         throw new IllegalArgumentException("Supplied Rpc Type is not defined.");
182     }
183
184     private RoutingStrategy getRoutingStrategy(RpcDefinition rpc) {
185         ContainerSchemaNode input = rpc.getInput();
186         if (input != null) {
187             for (DataSchemaNode schemaNode : input.getChildNodes()) {
188                 Optional<QName> context = getRoutingContext(schemaNode);
189                 if (context.isPresent()) {
190                     return createRoutedStrategy(rpc, context.get(), schemaNode.getQName());
191                 }
192             }
193         }
194         return createGlobalStrategy(rpc);
195     }
196
197     private static RoutingStrategy createRoutedStrategy(RpcDefinition rpc, QName context, QName leafNode) {
198         return new RoutedRpcStrategy(rpc.getQName(), context, leafNode);
199     }
200
201     private Optional<QName> getRoutingContext(DataSchemaNode schemaNode) {
202         for (UnknownSchemaNode extension : schemaNode.getUnknownSchemaNodes()) {
203             if (CONTEXT_REFERENCE.equals(extension.getNodeType())) {
204                 return Optional.fromNullable(extension.getQName());
205             }
206             ;
207         }
208         return Optional.absent();
209     }
210
211     private static RoutingStrategy createGlobalStrategy(RpcDefinition rpc) {
212         GlobalRpcStrategy ret = new GlobalRpcStrategy(rpc.getQName());
213         return ret;
214     }
215
216     private static abstract class RoutingStrategy implements Identifiable<QName> {
217
218         private final QName identifier;
219
220         public RoutingStrategy(QName identifier) {
221             super();
222             this.identifier = identifier;
223         }
224
225         @Override
226         public QName getIdentifier() {
227             return identifier;
228         }
229     }
230
231     private static class GlobalRpcStrategy extends RoutingStrategy {
232
233         public GlobalRpcStrategy(QName identifier) {
234             super(identifier);
235         }
236     }
237
238     private static class RoutedRpcStrategy extends RoutingStrategy {
239
240         private final QName context;
241         private final QName leaf;
242
243         public RoutedRpcStrategy(QName identifier, QName ctx, QName leaf) {
244             super(identifier);
245             this.context = ctx;
246             this.leaf = leaf;
247         }
248
249         public QName getContext() {
250             return context;
251         }
252
253         public QName getLeaf() {
254             return leaf;
255         }
256     }
257
258     private static class RoutedRpcSelector implements RpcImplementation, AutoCloseable, Identifiable<QName> {
259
260         private final RoutedRpcStrategy strategy;
261         private final Set<QName> supportedRpcs;
262         private RpcImplementation defaultDelegate;
263         private final ConcurrentMap<InstanceIdentifier, RoutedRpcRegImpl> implementations = new ConcurrentHashMap<>();
264         private SchemaAwareRpcBroker router;
265
266         public RoutedRpcSelector(RoutedRpcStrategy strategy, SchemaAwareRpcBroker router) {
267             super();
268             this.strategy = strategy;
269             supportedRpcs = ImmutableSet.of(strategy.getIdentifier());
270             this.router = router;
271         }
272
273         @Override
274         public QName getIdentifier() {
275             return strategy.getIdentifier();
276         }
277
278         @Override
279         public void close() throws Exception {
280
281         }
282
283         @Override
284         public Set<QName> getSupportedRpcs() {
285             return supportedRpcs;
286         }
287
288         public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
289             return new RoutedRpcRegImpl(rpcType, implementation, this);
290         }
291
292         @Override
293         public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
294             CompositeNode inputContainer = input.getFirstCompositeByName(QName.create(rpc,"input"));
295             checkArgument(inputContainer != null, "Rpc payload must contain input element");
296             SimpleNode<?> routeContainer = inputContainer.getFirstSimpleByName(strategy.getLeaf());
297             checkArgument(routeContainer != null, "Leaf %s must be set with value", strategy.getLeaf());
298             Object route = routeContainer.getValue();
299             RpcImplementation potential = null;
300             if (route != null) {
301                 RoutedRpcRegImpl potentialReg = implementations.get(route);
302                 if (potentialReg != null) {
303                     potential = potentialReg.getInstance();
304                 }
305             }
306             if (potential == null) {
307                 potential = defaultDelegate;
308             }
309             checkState(potential != null, "No implementation is available for rpc:%s path:%s", rpc, route);
310             return potential.invokeRpc(rpc, input);
311         }
312
313         public void addPath(QName context, InstanceIdentifier path, RoutedRpcRegImpl routedRpcRegImpl) {
314             //checkArgument(strategy.getContext().equals(context),"Supplied context is not supported.");
315             RoutedRpcRegImpl previous = implementations.put(path, routedRpcRegImpl);
316             if (previous == null) {
317                 router.notifyPathAnnouncement(context,strategy.getIdentifier(), path);
318             }
319
320         }
321
322         public void removePath(QName context, InstanceIdentifier path, RoutedRpcRegImpl routedRpcRegImpl) {
323             boolean removed = implementations.remove(path, routedRpcRegImpl);
324             if (removed) {
325                 router.notifyPathWithdrawal(context, strategy.getIdentifier(), path);
326             }
327         }
328     }
329
330     private static class GlobalRpcRegistration extends AbstractObjectRegistration<RpcImplementation> implements
331             RpcRegistration {
332         private final QName type;
333         private SchemaAwareRpcBroker router;
334
335         public GlobalRpcRegistration(QName type, RpcImplementation instance, SchemaAwareRpcBroker router) {
336             super(instance);
337             this.type = type;
338             this.router = router;
339         }
340
341         @Override
342         public QName getType() {
343             return type;
344         }
345
346         @Override
347         protected void removeRegistration() {
348             if (router != null) {
349                 router.remove(this);
350                 router = null;
351             }
352         }
353     }
354
355     private static class RoutedRpcRegImpl extends AbstractObjectRegistration<RpcImplementation> implements
356             RoutedRpcRegistration {
357
358         private final QName type;
359         private RoutedRpcSelector router;
360
361         public RoutedRpcRegImpl(QName rpcType, RpcImplementation implementation, RoutedRpcSelector routedRpcSelector) {
362             super(implementation);
363             this.type = rpcType;
364             router = routedRpcSelector;
365         }
366
367         @Override
368         public void registerPath(QName context, InstanceIdentifier path) {
369             router.addPath(context, path, this);
370         }
371
372         @Override
373         public void unregisterPath(QName context, InstanceIdentifier path) {
374             router.removePath(context, path, this);
375         }
376
377         @Override
378         protected void removeRegistration() {
379
380         }
381
382         @Override
383         public QName getType() {
384             return type;
385         }
386
387     }
388
389     private void remove(GlobalRpcRegistration registration) {
390         implementations.remove(registration.getType(), registration);
391     }
392
393     private void notifyPathAnnouncement(QName context, QName identifier, InstanceIdentifier path) {
394         RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
395         RouteChange<RpcRoutingContext, InstanceIdentifier> change = RoutingUtils.announcementChange(contextWrapped , path);
396         for(ListenerRegistration<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeListener : routeChangeListeners) {
397             try {
398                 routeListener.getInstance().onRouteChange(change);
399             } catch (Exception e) {
400                 LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e);
401                 
402             }
403         }
404         
405     }
406
407     
408
409     private void notifyPathWithdrawal(QName context,QName identifier, InstanceIdentifier path) {
410         RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
411         RouteChange<RpcRoutingContext, InstanceIdentifier> change = RoutingUtils.removalChange(contextWrapped , path);
412         for(ListenerRegistration<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeListener : routeChangeListeners) {
413             try {
414                 routeListener.getInstance().onRouteChange(change);
415             } catch (Exception e) {
416                 LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e);
417             }
418         }
419     }
420     
421     @Override
422     public <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> ListenerRegistration<L> registerRouteChangeListener(
423             L listener) {
424         return routeChangeListeners.registerWithType(listener);
425     }
426 }