Merge changes I8c23739a,Ia0e70828
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / sal / dom / broker / impl / SchemaAwareRpcBroker.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.dom.broker.impl;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12
13 import java.util.Set;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.ConcurrentMap;
16
17 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
18 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
19 import org.opendaylight.controller.md.sal.common.impl.routing.RoutingUtils;
20 import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
21 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
22 import org.opendaylight.controller.sal.core.api.RpcImplementation;
23 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
24 import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
25 import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter;
26 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
27 import org.opendaylight.yangtools.concepts.Identifiable;
28 import org.opendaylight.yangtools.concepts.ListenerRegistration;
29 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
30 import org.opendaylight.yangtools.yang.common.QName;
31 import org.opendaylight.yangtools.yang.common.RpcResult;
32 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
33 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
34 import org.opendaylight.yangtools.yang.data.api.SimpleNode;
35 import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
36 import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
37 import org.opendaylight.yangtools.yang.model.api.Module;
38 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
39 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
40 import org.opendaylight.yangtools.yang.model.api.UnknownSchemaNode;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 import com.google.common.base.Optional;
45 import com.google.common.collect.ImmutableSet;
46
47 public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String> {
48
49     private static final Logger LOG = LoggerFactory.getLogger(SchemaAwareRpcBroker.class);
50
51     private static final QName CONTEXT_REFERENCE = QName.create("urn:opendaylight:yang:extension:yang-ext",
52             "2013-07-09", "context-reference");
53     private final ListenerRegistry<RpcRegistrationListener> rpcRegistrationListeners = new ListenerRegistry<>();
54     private final ListenerRegistry<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeChangeListeners = new ListenerRegistry<>();
55     
56
57     private final String identifier;
58     private final ConcurrentMap<QName, RpcImplementation> implementations = new ConcurrentHashMap<>();
59     private RpcImplementation defaultImplementation;
60     private SchemaContextProvider schemaProvider;
61
62     public SchemaAwareRpcBroker(String identifier, SchemaContextProvider schemaProvider) {
63         super();
64         this.identifier = identifier;
65         this.schemaProvider = schemaProvider;
66     }
67
68     public RpcImplementation getDefaultImplementation() {
69         return defaultImplementation;
70     }
71
72     public void setDefaultImplementation(RpcImplementation defaultImplementation) {
73         this.defaultImplementation = defaultImplementation;
74     }
75
76     public SchemaContextProvider getSchemaProvider() {
77         return schemaProvider;
78     }
79
80     public void setSchemaProvider(SchemaContextProvider schemaProvider) {
81         this.schemaProvider = schemaProvider;
82     }
83
84     @Override
85     public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
86         checkArgument(rpcType != null, "RPC Type should not be null");
87         checkArgument(implementation != null, "RPC Implementatoin should not be null");
88         return getOrCreateRoutedRpcRouter(rpcType).addRoutedRpcImplementation(rpcType, implementation);
89     }
90
91     private RoutedRpcSelector getOrCreateRoutedRpcRouter(QName rpcType) {
92         RoutedRpcSelector potential = getRoutedRpcRouter(rpcType);
93         if (potential != null) {
94             return potential;
95         }
96         synchronized (implementations) {
97             potential = getRoutedRpcRouter(rpcType);
98             if (potential != null) {
99                 return potential;
100             }
101             RpcDefinition definition = findRpcDefinition(rpcType);
102             RoutingStrategy strategy = getRoutingStrategy(definition);
103             checkState(strategy instanceof RoutedRpcStrategy, "Rpc %s is not routed.", rpcType);
104             potential = new RoutedRpcSelector((RoutedRpcStrategy) strategy, this);
105             implementations.put(rpcType, potential);
106             return potential;
107         }
108     }
109
110     private RoutedRpcSelector getRoutedRpcRouter(QName rpcType) {
111         RpcImplementation potential = implementations.get(rpcType);
112         if (potential != null) {
113             checkState(potential instanceof RoutedRpcSelector, "Rpc %s is not routed.", rpcType);
114             return (RoutedRpcSelector) potential;
115         }
116         return null;
117
118     }
119
120     @Override
121     public RpcRegistration addRpcImplementation(QName rpcType, RpcImplementation implementation)
122             throws IllegalArgumentException {
123         checkArgument(rpcType != null, "RPC Type should not be null");
124         checkArgument(implementation != null, "RPC Implementatoin should not be null");
125         checkState(!hasRpcImplementation(rpcType), "Implementation already registered");
126         RpcDefinition definition = findRpcDefinition(rpcType);
127         checkArgument(!isRoutedRpc(definition), "RPC Type must not be routed.");
128         GlobalRpcRegistration reg = new GlobalRpcRegistration(rpcType, implementation, this);
129         implementations.putIfAbsent(rpcType, implementation);
130         return reg;
131     }
132
133     private boolean isRoutedRpc(RpcDefinition definition) {
134         return getRoutingStrategy(definition) instanceof RoutedRpcStrategy;
135     }
136
137     @Override
138     public ListenerRegistration<RpcRegistrationListener> addRpcRegistrationListener(RpcRegistrationListener listener) {
139         return rpcRegistrationListeners.register(listener);
140     }
141
142     @Override
143     public String getIdentifier() {
144         return identifier;
145     }
146
147     @Override
148     public Set<QName> getSupportedRpcs() {
149         return ImmutableSet.copyOf(implementations.keySet());
150     }
151
152     @Override
153     public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
154         return findRpcImplemention(rpc).invokeRpc(rpc, input);
155     }
156
157     private RpcImplementation findRpcImplemention(QName rpc) {
158         checkArgument(rpc != null, "Rpc name should not be null");
159         RpcImplementation potentialImpl = implementations.get(rpc);
160         if (potentialImpl != null) {
161             return potentialImpl;
162         }
163         potentialImpl = defaultImplementation;
164         checkState(potentialImpl != null, "Implementation is not available.");
165         return potentialImpl;
166     }
167
168     private boolean hasRpcImplementation(QName rpc) {
169         return implementations.containsKey(rpc);
170     }
171
172     private RpcDefinition findRpcDefinition(QName rpcType) {
173         checkArgument(rpcType != null, "Rpc name must be supplied.");
174         checkState(schemaProvider != null, "Schema Provider is not available.");
175         SchemaContext ctx = schemaProvider.getSchemaContext();
176         checkState(ctx != null, "YANG Schema Context is not available.");
177         Module module = ctx.findModuleByNamespaceAndRevision(rpcType.getNamespace(), rpcType.getRevision());
178         checkState(module != null, "YANG Module is not available.");
179         return findRpcDefinition(rpcType, module.getRpcs());
180     }
181
182     static private RpcDefinition findRpcDefinition(QName rpcType, Set<RpcDefinition> rpcs) {
183         checkState(rpcs != null, "Rpc schema is not available.");
184         for (RpcDefinition rpc : rpcs) {
185             if (rpcType.equals(rpc.getQName())) {
186                 return rpc;
187             }
188         }
189         throw new IllegalArgumentException("Supplied Rpc Type is not defined.");
190     }
191
192     private RoutingStrategy getRoutingStrategy(RpcDefinition rpc) {
193         ContainerSchemaNode input = rpc.getInput();
194         if (input != null) {
195             for (DataSchemaNode schemaNode : input.getChildNodes()) {
196                 Optional<QName> context = getRoutingContext(schemaNode);
197                 if (context.isPresent()) {
198                     return createRoutedStrategy(rpc, context.get(), schemaNode.getQName());
199                 }
200             }
201         }
202         return createGlobalStrategy(rpc);
203     }
204
205     private static RoutingStrategy createRoutedStrategy(RpcDefinition rpc, QName context, QName leafNode) {
206         return new RoutedRpcStrategy(rpc.getQName(), context, leafNode);
207     }
208
209     private Optional<QName> getRoutingContext(DataSchemaNode schemaNode) {
210         for (UnknownSchemaNode extension : schemaNode.getUnknownSchemaNodes()) {
211             if (CONTEXT_REFERENCE.equals(extension.getNodeType())) {
212                 return Optional.fromNullable(extension.getQName());
213             }
214             ;
215         }
216         return Optional.absent();
217     }
218
219     private static RoutingStrategy createGlobalStrategy(RpcDefinition rpc) {
220         GlobalRpcStrategy ret = new GlobalRpcStrategy(rpc.getQName());
221         return ret;
222     }
223
224     private static abstract class RoutingStrategy implements Identifiable<QName> {
225
226         private final QName identifier;
227
228         public RoutingStrategy(QName identifier) {
229             super();
230             this.identifier = identifier;
231         }
232
233         @Override
234         public QName getIdentifier() {
235             return identifier;
236         }
237     }
238
239     private static class GlobalRpcStrategy extends RoutingStrategy {
240
241         public GlobalRpcStrategy(QName identifier) {
242             super(identifier);
243         }
244     }
245
246     private static class RoutedRpcStrategy extends RoutingStrategy {
247
248         private final QName context;
249         private final QName leaf;
250
251         public RoutedRpcStrategy(QName identifier, QName ctx, QName leaf) {
252             super(identifier);
253             this.context = ctx;
254             this.leaf = leaf;
255         }
256
257         public QName getContext() {
258             return context;
259         }
260
261         public QName getLeaf() {
262             return leaf;
263         }
264     }
265
266     private static class RoutedRpcSelector implements RpcImplementation, AutoCloseable, Identifiable<QName> {
267
268         private final RoutedRpcStrategy strategy;
269         private final Set<QName> supportedRpcs;
270         private RpcImplementation defaultDelegate;
271         private final ConcurrentMap<InstanceIdentifier, RoutedRpcRegImpl> implementations = new ConcurrentHashMap<>();
272         private SchemaAwareRpcBroker router;
273
274         public RoutedRpcSelector(RoutedRpcStrategy strategy, SchemaAwareRpcBroker router) {
275             super();
276             this.strategy = strategy;
277             supportedRpcs = ImmutableSet.of(strategy.getIdentifier());
278             this.router = router;
279         }
280
281         @Override
282         public QName getIdentifier() {
283             return strategy.getIdentifier();
284         }
285
286         @Override
287         public void close() throws Exception {
288
289         }
290
291         @Override
292         public Set<QName> getSupportedRpcs() {
293             return supportedRpcs;
294         }
295
296         public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
297             return new RoutedRpcRegImpl(rpcType, implementation, this);
298         }
299
300         @Override
301         public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
302             CompositeNode inputContainer = input.getFirstCompositeByName(QName.create(rpc,"input"));
303             checkArgument(inputContainer != null, "Rpc payload must contain input element");
304             SimpleNode<?> routeContainer = inputContainer.getFirstSimpleByName(strategy.getLeaf());
305             checkArgument(routeContainer != null, "Leaf %s must be set with value", strategy.getLeaf());
306             Object route = routeContainer.getValue();
307             RpcImplementation potential = null;
308             if (route != null) {
309                 RoutedRpcRegImpl potentialReg = implementations.get(route);
310                 if (potentialReg != null) {
311                     potential = potentialReg.getInstance();
312                 }
313             }
314             if (potential == null) {
315                 potential = defaultDelegate;
316             }
317             checkState(potential != null, "No implementation is available for rpc:%s path:%s", rpc, route);
318             return potential.invokeRpc(rpc, input);
319         }
320
321         public void addPath(QName context, InstanceIdentifier path, RoutedRpcRegImpl routedRpcRegImpl) {
322             //checkArgument(strategy.getContext().equals(context),"Supplied context is not supported.");
323             RoutedRpcRegImpl previous = implementations.put(path, routedRpcRegImpl);
324             if (previous == null) {
325                 router.notifyPathAnnouncement(context,strategy.getIdentifier(), path);
326             }
327
328         }
329
330         public void removePath(QName context, InstanceIdentifier path, RoutedRpcRegImpl routedRpcRegImpl) {
331             boolean removed = implementations.remove(path, routedRpcRegImpl);
332             if (removed) {
333                 router.notifyPathWithdrawal(context, strategy.getIdentifier(), path);
334             }
335         }
336     }
337
338     private static class GlobalRpcRegistration extends AbstractObjectRegistration<RpcImplementation> implements
339             RpcRegistration {
340         private final QName type;
341         private SchemaAwareRpcBroker router;
342
343         public GlobalRpcRegistration(QName type, RpcImplementation instance, SchemaAwareRpcBroker router) {
344             super(instance);
345             this.type = type;
346             this.router = router;
347         }
348
349         @Override
350         public QName getType() {
351             return type;
352         }
353
354         @Override
355         protected void removeRegistration() {
356             if (router != null) {
357                 router.remove(this);
358                 router = null;
359             }
360         }
361     }
362
363     private static class RoutedRpcRegImpl extends AbstractObjectRegistration<RpcImplementation> implements
364             RoutedRpcRegistration {
365
366         private final QName type;
367         private RoutedRpcSelector router;
368
369         public RoutedRpcRegImpl(QName rpcType, RpcImplementation implementation, RoutedRpcSelector routedRpcSelector) {
370             super(implementation);
371             this.type = rpcType;
372             router = routedRpcSelector;
373         }
374
375         @Override
376         public void registerPath(QName context, InstanceIdentifier path) {
377             router.addPath(context, path, this);
378         }
379
380         @Override
381         public void unregisterPath(QName context, InstanceIdentifier path) {
382             router.removePath(context, path, this);
383         }
384
385         @Override
386         protected void removeRegistration() {
387
388         }
389
390         @Override
391         public QName getType() {
392             return type;
393         }
394
395     }
396
397     private void remove(GlobalRpcRegistration registration) {
398         implementations.remove(registration.getType(), registration);
399     }
400
401     private void notifyPathAnnouncement(QName context, QName identifier, InstanceIdentifier path) {
402         RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
403         RouteChange<RpcRoutingContext, InstanceIdentifier> change = RoutingUtils.announcementChange(contextWrapped , path);
404         for(ListenerRegistration<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeListener : routeChangeListeners) {
405             try {
406                 routeListener.getInstance().onRouteChange(change);
407             } catch (Exception e) {
408                 LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e);
409                 
410             }
411         }
412         
413     }
414
415     
416
417     private void notifyPathWithdrawal(QName context,QName identifier, InstanceIdentifier path) {
418         RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
419         RouteChange<RpcRoutingContext, InstanceIdentifier> change = RoutingUtils.removalChange(contextWrapped , path);
420         for(ListenerRegistration<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeListener : routeChangeListeners) {
421             try {
422                 routeListener.getInstance().onRouteChange(change);
423             } catch (Exception e) {
424                 LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e);
425             }
426         }
427     }
428     
429     @Override
430     public <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> ListenerRegistration<L> registerRouteChangeListener(
431             L listener) {
432         return routeChangeListeners.registerWithType(listener);
433     }
434 }