1 package org.opendaylight.controller.sal.dom.broker.impl;
3 import static com.google.common.base.Preconditions.checkArgument;
4 import static com.google.common.base.Preconditions.checkState;
7 import java.util.concurrent.ConcurrentHashMap;
8 import java.util.concurrent.ConcurrentMap;
10 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
11 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
12 import org.opendaylight.controller.md.sal.common.impl.routing.RoutingUtils;
13 import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
14 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
15 import org.opendaylight.controller.sal.core.api.RpcImplementation;
16 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
17 import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
18 import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter;
19 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
20 import org.opendaylight.yangtools.concepts.Identifiable;
21 import org.opendaylight.yangtools.concepts.ListenerRegistration;
22 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
23 import org.opendaylight.yangtools.yang.common.QName;
24 import org.opendaylight.yangtools.yang.common.RpcResult;
25 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
26 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
27 import org.opendaylight.yangtools.yang.data.api.SimpleNode;
28 import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
29 import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
30 import org.opendaylight.yangtools.yang.model.api.Module;
31 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
32 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
33 import org.opendaylight.yangtools.yang.model.api.UnknownSchemaNode;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
37 import com.google.common.base.Optional;
38 import com.google.common.collect.ImmutableSet;
40 public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String> {
42 private static final Logger LOG = LoggerFactory.getLogger(SchemaAwareRpcBroker.class);
44 private static final QName CONTEXT_REFERENCE = QName.create("urn:opendaylight:yang:extension:yang-ext",
45 "2013-07-09", "context-reference");
46 private final ListenerRegistry<RpcRegistrationListener> rpcRegistrationListeners = new ListenerRegistry<>();
47 private final ListenerRegistry<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeChangeListeners = new ListenerRegistry<>();
50 private final String identifier;
51 private final ConcurrentMap<QName, RpcImplementation> implementations = new ConcurrentHashMap<>();
52 private RpcImplementation defaultImplementation;
53 private SchemaContextProvider schemaProvider;
55 public SchemaAwareRpcBroker(String identifier, SchemaContextProvider schemaProvider) {
57 this.identifier = identifier;
58 this.schemaProvider = schemaProvider;
61 public RpcImplementation getDefaultImplementation() {
62 return defaultImplementation;
65 public void setDefaultImplementation(RpcImplementation defaultImplementation) {
66 this.defaultImplementation = defaultImplementation;
69 public SchemaContextProvider getSchemaProvider() {
70 return schemaProvider;
73 public void setSchemaProvider(SchemaContextProvider schemaProvider) {
74 this.schemaProvider = schemaProvider;
78 public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
79 checkArgument(rpcType != null, "RPC Type should not be null");
80 checkArgument(implementation != null, "RPC Implementatoin should not be null");
81 return getOrCreateRoutedRpcRouter(rpcType).addRoutedRpcImplementation(rpcType, implementation);
84 private RoutedRpcSelector getOrCreateRoutedRpcRouter(QName rpcType) {
85 RoutedRpcSelector potential = getRoutedRpcRouter(rpcType);
86 if (potential != null) {
89 synchronized (implementations) {
90 potential = getRoutedRpcRouter(rpcType);
91 if (potential != null) {
94 RpcDefinition definition = findRpcDefinition(rpcType);
95 RoutingStrategy strategy = getRoutingStrategy(definition);
96 checkState(strategy instanceof RoutedRpcStrategy, "Rpc %s is not routed.", rpcType);
97 potential = new RoutedRpcSelector((RoutedRpcStrategy) strategy, this);
98 implementations.put(rpcType, potential);
103 private RoutedRpcSelector getRoutedRpcRouter(QName rpcType) {
104 RpcImplementation potential = implementations.get(rpcType);
105 if (potential != null) {
106 checkState(potential instanceof RoutedRpcSelector, "Rpc %s is not routed.", rpcType);
107 return (RoutedRpcSelector) potential;
114 public RpcRegistration addRpcImplementation(QName rpcType, RpcImplementation implementation)
115 throws IllegalArgumentException {
116 checkArgument(rpcType != null, "RPC Type should not be null");
117 checkArgument(implementation != null, "RPC Implementatoin should not be null");
118 checkState(!hasRpcImplementation(rpcType), "Implementation already registered");
119 RpcDefinition definition = findRpcDefinition(rpcType);
120 checkArgument(!isRoutedRpc(definition), "RPC Type must not be routed.");
121 GlobalRpcRegistration reg = new GlobalRpcRegistration(rpcType, implementation, this);
122 implementations.putIfAbsent(rpcType, implementation);
126 private boolean isRoutedRpc(RpcDefinition definition) {
127 return getRoutingStrategy(definition) instanceof RoutedRpcStrategy;
131 public ListenerRegistration<RpcRegistrationListener> addRpcRegistrationListener(RpcRegistrationListener listener) {
132 return rpcRegistrationListeners.register(listener);
136 public String getIdentifier() {
141 public Set<QName> getSupportedRpcs() {
142 return ImmutableSet.copyOf(implementations.keySet());
146 public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
147 return findRpcImplemention(rpc).invokeRpc(rpc, input);
150 private RpcImplementation findRpcImplemention(QName rpc) {
151 checkArgument(rpc != null, "Rpc name should not be null");
152 RpcImplementation potentialImpl = implementations.get(rpc);
153 if (potentialImpl != null) {
154 return potentialImpl;
156 potentialImpl = defaultImplementation;
157 checkState(potentialImpl != null, "Implementation is not available.");
158 return potentialImpl;
161 private boolean hasRpcImplementation(QName rpc) {
162 return implementations.containsKey(rpc);
165 private RpcDefinition findRpcDefinition(QName rpcType) {
166 checkArgument(rpcType != null, "Rpc name must be supplied.");
167 checkState(schemaProvider != null, "Schema Provider is not available.");
168 SchemaContext ctx = schemaProvider.getSchemaContext();
169 checkState(ctx != null, "YANG Schema Context is not available.");
170 Module module = ctx.findModuleByNamespaceAndRevision(rpcType.getNamespace(), rpcType.getRevision());
171 checkState(module != null, "YANG Module is not available.");
172 return findRpcDefinition(rpcType, module.getRpcs());
175 static private RpcDefinition findRpcDefinition(QName rpcType, Set<RpcDefinition> rpcs) {
176 checkState(rpcs != null, "Rpc schema is not available.");
177 for (RpcDefinition rpc : rpcs) {
178 if (rpcType.equals(rpc.getQName())) {
182 throw new IllegalArgumentException("Supplied Rpc Type is not defined.");
185 private RoutingStrategy getRoutingStrategy(RpcDefinition rpc) {
186 ContainerSchemaNode input = rpc.getInput();
188 for (DataSchemaNode schemaNode : input.getChildNodes()) {
189 Optional<QName> context = getRoutingContext(schemaNode);
190 if (context.isPresent()) {
191 return createRoutedStrategy(rpc, context.get(), schemaNode.getQName());
195 return createGlobalStrategy(rpc);
198 private static RoutingStrategy createRoutedStrategy(RpcDefinition rpc, QName context, QName leafNode) {
199 return new RoutedRpcStrategy(rpc.getQName(), context, leafNode);
202 private Optional<QName> getRoutingContext(DataSchemaNode schemaNode) {
203 for (UnknownSchemaNode extension : schemaNode.getUnknownSchemaNodes()) {
204 if (CONTEXT_REFERENCE.equals(extension.getNodeType())) {
205 return Optional.fromNullable(extension.getQName());
209 return Optional.absent();
212 private static RoutingStrategy createGlobalStrategy(RpcDefinition rpc) {
213 GlobalRpcStrategy ret = new GlobalRpcStrategy(rpc.getQName());
217 private static abstract class RoutingStrategy implements Identifiable<QName> {
219 private final QName identifier;
221 public RoutingStrategy(QName identifier) {
223 this.identifier = identifier;
227 public QName getIdentifier() {
232 private static class GlobalRpcStrategy extends RoutingStrategy {
234 public GlobalRpcStrategy(QName identifier) {
239 private static class RoutedRpcStrategy extends RoutingStrategy {
241 private final QName context;
242 private final QName leaf;
244 public RoutedRpcStrategy(QName identifier, QName ctx, QName leaf) {
250 public QName getContext() {
254 public QName getLeaf() {
259 private static class RoutedRpcSelector implements RpcImplementation, AutoCloseable, Identifiable<QName> {
261 private final RoutedRpcStrategy strategy;
262 private final Set<QName> supportedRpcs;
263 private RpcImplementation defaultDelegate;
264 private final ConcurrentMap<InstanceIdentifier, RoutedRpcRegImpl> implementations = new ConcurrentHashMap<>();
265 private SchemaAwareRpcBroker router;
267 public RoutedRpcSelector(RoutedRpcStrategy strategy, SchemaAwareRpcBroker router) {
269 this.strategy = strategy;
270 supportedRpcs = ImmutableSet.of(strategy.getIdentifier());
271 this.router = router;
275 public QName getIdentifier() {
276 return strategy.getIdentifier();
280 public void close() throws Exception {
285 public Set<QName> getSupportedRpcs() {
286 return supportedRpcs;
289 public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
290 return new RoutedRpcRegImpl(rpcType, implementation, this);
294 public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
295 CompositeNode inputContainer = input.getFirstCompositeByName(QName.create(rpc,"input"));
296 checkArgument(inputContainer != null, "Rpc payload must contain input element");
297 SimpleNode<?> routeContainer = inputContainer.getFirstSimpleByName(strategy.getLeaf());
298 checkArgument(routeContainer != null, "Leaf %s must be set with value", strategy.getLeaf());
299 Object route = routeContainer.getValue();
300 RpcImplementation potential = null;
302 RoutedRpcRegImpl potentialReg = implementations.get(route);
303 if (potentialReg != null) {
304 potential = potentialReg.getInstance();
307 if (potential == null) {
308 potential = defaultDelegate;
310 checkState(potential != null, "No implementation is available for rpc:%s path:%s", rpc, route);
311 return potential.invokeRpc(rpc, input);
314 public void addPath(QName context, InstanceIdentifier path, RoutedRpcRegImpl routedRpcRegImpl) {
315 //checkArgument(strategy.getContext().equals(context),"Supplied context is not supported.");
316 RoutedRpcRegImpl previous = implementations.put(path, routedRpcRegImpl);
317 if (previous == null) {
318 router.notifyPathAnnouncement(context,strategy.getIdentifier(), path);
323 public void removePath(QName context, InstanceIdentifier path, RoutedRpcRegImpl routedRpcRegImpl) {
324 boolean removed = implementations.remove(path, routedRpcRegImpl);
326 router.notifyPathWithdrawal(context, strategy.getIdentifier(), path);
331 private static class GlobalRpcRegistration extends AbstractObjectRegistration<RpcImplementation> implements
333 private final QName type;
334 private SchemaAwareRpcBroker router;
336 public GlobalRpcRegistration(QName type, RpcImplementation instance, SchemaAwareRpcBroker router) {
339 this.router = router;
343 public QName getType() {
348 protected void removeRegistration() {
349 if (router != null) {
356 private static class RoutedRpcRegImpl extends AbstractObjectRegistration<RpcImplementation> implements
357 RoutedRpcRegistration {
359 private final QName type;
360 private RoutedRpcSelector router;
362 public RoutedRpcRegImpl(QName rpcType, RpcImplementation implementation, RoutedRpcSelector routedRpcSelector) {
363 super(implementation);
365 router = routedRpcSelector;
369 public void registerPath(QName context, InstanceIdentifier path) {
370 router.addPath(context, path, this);
374 public void unregisterPath(QName context, InstanceIdentifier path) {
375 router.removePath(context, path, this);
379 protected void removeRegistration() {
384 public QName getType() {
390 private void remove(GlobalRpcRegistration registration) {
391 implementations.remove(registration.getType(), registration);
394 private void notifyPathAnnouncement(QName context, QName identifier, InstanceIdentifier path) {
395 RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
396 RouteChange<RpcRoutingContext, InstanceIdentifier> change = RoutingUtils.announcementChange(contextWrapped , path);
397 for(ListenerRegistration<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeListener : routeChangeListeners) {
399 routeListener.getInstance().onRouteChange(change);
400 } catch (Exception e) {
401 LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e);
410 private void notifyPathWithdrawal(QName context,QName identifier, InstanceIdentifier path) {
411 RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
412 RouteChange<RpcRoutingContext, InstanceIdentifier> change = RoutingUtils.removalChange(contextWrapped , path);
413 for(ListenerRegistration<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeListener : routeChangeListeners) {
415 routeListener.getInstance().onRouteChange(change);
416 } catch (Exception e) {
417 LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e);
423 public <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> ListenerRegistration<L> registerRouteChangeListener(
425 return routeChangeListeners.registerWithType(listener);