2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.dom.broker.impl;
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.ConcurrentMap;
17 import com.google.common.base.Preconditions;
18 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
19 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
20 import org.opendaylight.controller.md.sal.common.impl.routing.RoutingUtils;
21 import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
22 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
23 import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation;
24 import org.opendaylight.controller.sal.core.api.RpcImplementation;
25 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
26 import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
27 import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter;
28 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
29 import org.opendaylight.yangtools.concepts.Identifiable;
30 import org.opendaylight.yangtools.concepts.ListenerRegistration;
31 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
32 import org.opendaylight.yangtools.yang.common.QName;
33 import org.opendaylight.yangtools.yang.common.RpcResult;
34 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
35 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
36 import org.opendaylight.yangtools.yang.data.api.SimpleNode;
37 import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
38 import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
39 import org.opendaylight.yangtools.yang.model.api.Module;
40 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
41 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
42 import org.opendaylight.yangtools.yang.model.api.UnknownSchemaNode;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
46 import com.google.common.base.Optional;
47 import com.google.common.collect.FluentIterable;
48 import com.google.common.collect.ImmutableMap;
49 import com.google.common.collect.ImmutableSet;
50 import com.google.common.util.concurrent.ListenableFuture;
52 public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, RoutedRpcDefaultImplementation {
54 private static final Logger LOG = LoggerFactory.getLogger(SchemaAwareRpcBroker.class);
56 private static final QName CONTEXT_REFERENCE = QName.create("urn:opendaylight:yang:extension:yang-ext",
57 "2013-07-09", "context-reference");
58 private final ListenerRegistry<RpcRegistrationListener> rpcRegistrationListeners = new ListenerRegistry<>();
59 private final ListenerRegistry<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeChangeListeners = new ListenerRegistry<>();
62 private final String identifier;
63 private final ConcurrentMap<QName, RpcImplementation> implementations = new ConcurrentHashMap<>();
64 private RpcImplementation defaultImplementation;
65 private SchemaContextProvider schemaProvider;
66 private RoutedRpcDefaultImplementation defaultDelegate;
68 public SchemaAwareRpcBroker(String identifier, SchemaContextProvider schemaProvider) {
70 this.identifier = identifier;
71 this.schemaProvider = schemaProvider;
74 public RpcImplementation getDefaultImplementation() {
75 return defaultImplementation;
78 public void setDefaultImplementation(RpcImplementation defaultImplementation) {
79 this.defaultImplementation = defaultImplementation;
82 public SchemaContextProvider getSchemaProvider() {
83 return schemaProvider;
86 public void setSchemaProvider(SchemaContextProvider schemaProvider) {
87 this.schemaProvider = schemaProvider;
90 public RoutedRpcDefaultImplementation getRoutedRpcDefaultDelegate() {
91 return defaultDelegate;
95 public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultDelegate) {
96 this.defaultDelegate = defaultDelegate;
100 public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
101 checkArgument(rpcType != null, "RPC Type should not be null");
102 checkArgument(implementation != null, "RPC Implementatoin should not be null");
103 return getOrCreateRoutedRpcRouter(rpcType).addRoutedRpcImplementation(rpcType, implementation);
106 private RoutedRpcSelector getOrCreateRoutedRpcRouter(QName rpcType) {
107 RoutedRpcSelector potential = getRoutedRpcRouter(rpcType);
108 if (potential != null) {
111 synchronized (implementations) {
112 potential = getRoutedRpcRouter(rpcType);
113 if (potential != null) {
116 RpcDefinition definition = findRpcDefinition(rpcType);
117 RoutingStrategy strategy = getRoutingStrategy(definition);
118 checkState(strategy instanceof RoutedRpcStrategy, "Rpc %s is not routed.", rpcType);
119 potential = new RoutedRpcSelector((RoutedRpcStrategy) strategy, this);
120 implementations.put(rpcType, potential);
125 private RoutedRpcSelector getRoutedRpcRouter(QName rpcType) {
126 RpcImplementation potential = implementations.get(rpcType);
127 if (potential != null) {
128 checkState(potential instanceof RoutedRpcSelector, "Rpc %s is not routed.", rpcType);
129 return (RoutedRpcSelector) potential;
136 public RpcRegistration addRpcImplementation(QName rpcType, RpcImplementation implementation)
137 throws IllegalArgumentException {
138 checkArgument(rpcType != null, "RPC Type should not be null");
139 checkArgument(implementation != null, "RPC Implementatoin should not be null");
140 checkState(!hasRpcImplementation(rpcType), "Implementation already registered");
141 RpcDefinition definition = findRpcDefinition(rpcType);
142 checkArgument(!isRoutedRpc(definition), "RPC Type must not be routed.");
143 GlobalRpcRegistration reg = new GlobalRpcRegistration(rpcType, implementation, this);
144 final RpcImplementation previous = implementations.putIfAbsent(rpcType, implementation);
145 Preconditions.checkState(previous == null, "Rpc %s is already registered.",rpcType);
146 notifyRpcAdded(rpcType);
150 private void notifyRpcAdded(QName rpcType) {
151 for (ListenerRegistration<RpcRegistrationListener> listener : rpcRegistrationListeners) {
153 listener.getInstance().onRpcImplementationAdded(rpcType);
154 } catch (Exception ex) {
155 LOG.error("Unhandled exception during invoking listener {}", listener.getInstance(), ex);
161 private boolean isRoutedRpc(RpcDefinition definition) {
162 return getRoutingStrategy(definition) instanceof RoutedRpcStrategy;
166 public ListenerRegistration<RpcRegistrationListener> addRpcRegistrationListener(RpcRegistrationListener listener) {
167 ListenerRegistration<RpcRegistrationListener> reg = rpcRegistrationListeners.register(listener);
168 for (QName impl : implementations.keySet()) {
169 listener.onRpcImplementationAdded(impl);
175 public String getIdentifier() {
180 public Set<QName> getSupportedRpcs() {
181 return ImmutableSet.copyOf(implementations.keySet());
185 public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
186 return findRpcImplemention(rpc).invokeRpc(rpc, input);
189 private RpcImplementation findRpcImplemention(QName rpc) {
190 checkArgument(rpc != null, "Rpc name should not be null");
191 RpcImplementation potentialImpl = implementations.get(rpc);
192 if (potentialImpl != null) {
193 return potentialImpl;
196 potentialImpl = defaultImplementation;
197 if( potentialImpl == null ) {
198 throw new UnsupportedOperationException( "No implementation for this operation is available." );
201 return potentialImpl;
204 private boolean hasRpcImplementation(QName rpc) {
205 return implementations.containsKey(rpc);
208 private RpcDefinition findRpcDefinition(QName rpcType) {
209 checkArgument(rpcType != null, "Rpc name must be supplied.");
210 checkState(schemaProvider != null, "Schema Provider is not available.");
211 SchemaContext ctx = schemaProvider.getSchemaContext();
212 checkState(ctx != null, "YANG Schema Context is not available.");
213 Module module = ctx.findModuleByNamespaceAndRevision(rpcType.getNamespace(), rpcType.getRevision());
214 checkState(module != null, "YANG Module is not available.");
215 return findRpcDefinition(rpcType, module.getRpcs());
218 static private RpcDefinition findRpcDefinition(QName rpcType, Set<RpcDefinition> rpcs) {
219 checkState(rpcs != null, "Rpc schema is not available.");
220 for (RpcDefinition rpc : rpcs) {
221 if (rpcType.equals(rpc.getQName())) {
225 throw new IllegalArgumentException("Supplied Rpc Type is not defined.");
228 private RoutingStrategy getRoutingStrategy(RpcDefinition rpc) {
229 ContainerSchemaNode input = rpc.getInput();
231 for (DataSchemaNode schemaNode : input.getChildNodes()) {
232 Optional<QName> context = getRoutingContext(schemaNode);
233 if (context.isPresent()) {
234 return createRoutedStrategy(rpc, context.get(), schemaNode.getQName());
238 return createGlobalStrategy(rpc);
241 private static RoutingStrategy createRoutedStrategy(RpcDefinition rpc, QName context, QName leafNode) {
242 return new RoutedRpcStrategy(rpc.getQName(), context, leafNode);
245 private Optional<QName> getRoutingContext(DataSchemaNode schemaNode) {
246 for (UnknownSchemaNode extension : schemaNode.getUnknownSchemaNodes()) {
247 if (CONTEXT_REFERENCE.equals(extension.getNodeType())) {
248 return Optional.fromNullable(extension.getQName());
251 return Optional.absent();
254 private static RoutingStrategy createGlobalStrategy(RpcDefinition rpc) {
255 GlobalRpcStrategy ret = new GlobalRpcStrategy(rpc.getQName());
260 public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
261 checkState(defaultDelegate != null);
262 return defaultDelegate.invokeRpc(rpc, identifier, input);
265 private static abstract class RoutingStrategy implements Identifiable<QName> {
267 private final QName identifier;
269 public RoutingStrategy(QName identifier) {
271 this.identifier = identifier;
275 public QName getIdentifier() {
280 private static class GlobalRpcStrategy extends RoutingStrategy {
282 public GlobalRpcStrategy(QName identifier) {
287 private static class RoutedRpcStrategy extends RoutingStrategy {
289 private final QName context;
290 private final QName leaf;
292 public RoutedRpcStrategy(QName identifier, QName ctx, QName leaf) {
298 public QName getContext() {
302 public QName getLeaf() {
307 private static class RoutedRpcSelector implements RpcImplementation, AutoCloseable, Identifiable<RpcRoutingContext> {
309 private final RoutedRpcStrategy strategy;
310 private final Set<QName> supportedRpcs;
311 private final RpcRoutingContext identifier;
312 private RpcImplementation defaultDelegate;
313 private final ConcurrentMap<InstanceIdentifier, RoutedRpcRegImpl> implementations = new ConcurrentHashMap<>();
314 private final SchemaAwareRpcBroker router;
316 public RoutedRpcSelector(RoutedRpcStrategy strategy, SchemaAwareRpcBroker router) {
318 this.strategy = strategy;
319 supportedRpcs = ImmutableSet.of(strategy.getIdentifier());
320 identifier = RpcRoutingContext.create(strategy.context, strategy.getIdentifier());
321 this.router = router;
325 public RpcRoutingContext getIdentifier() {
330 public void close() throws Exception {
335 public Set<QName> getSupportedRpcs() {
336 return supportedRpcs;
339 public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
340 return new RoutedRpcRegImpl(rpcType, implementation, this);
344 public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
345 CompositeNode inputContainer = input.getFirstCompositeByName(QName.create(rpc,"input"));
346 checkArgument(inputContainer != null, "Rpc payload must contain input element");
347 SimpleNode<?> routeContainer = inputContainer.getFirstSimpleByName(strategy.getLeaf());
348 checkArgument(routeContainer != null, "Leaf %s must be set with value", strategy.getLeaf());
349 Object route = routeContainer.getValue();
350 checkArgument(route instanceof InstanceIdentifier,
351 "The routed node %s is not an instance identifier", route);
352 RpcImplementation potential = null;
354 RoutedRpcRegImpl potentialReg = implementations.get(route);
355 if (potentialReg != null) {
356 potential = potentialReg.getInstance();
359 if (potential == null) {
360 return router.invokeRpc(rpc, (InstanceIdentifier) route, input);
362 checkState(potential != null, "No implementation is available for rpc:%s path:%s", rpc, route);
363 return potential.invokeRpc(rpc, input);
366 public void addPath(QName context, InstanceIdentifier path, RoutedRpcRegImpl routedRpcRegImpl) {
367 //checkArgument(strategy.getContext().equals(context),"Supplied context is not supported.");
368 RoutedRpcRegImpl previous = implementations.put(path, routedRpcRegImpl);
369 if (previous == null) {
370 router.notifyPathAnnouncement(context,strategy.getIdentifier(), path);
375 public void removePath(QName context, InstanceIdentifier path, RoutedRpcRegImpl routedRpcRegImpl) {
376 boolean removed = implementations.remove(path, routedRpcRegImpl);
378 router.notifyPathWithdrawal(context, strategy.getIdentifier(), path);
383 private static class GlobalRpcRegistration extends AbstractObjectRegistration<RpcImplementation> implements
385 private final QName type;
386 private SchemaAwareRpcBroker router;
388 public GlobalRpcRegistration(QName type, RpcImplementation instance, SchemaAwareRpcBroker router) {
391 this.router = router;
395 public QName getType() {
400 protected void removeRegistration() {
401 if (router != null) {
408 private static class RoutedRpcRegImpl extends AbstractObjectRegistration<RpcImplementation> implements
409 RoutedRpcRegistration {
411 private final QName type;
412 private final RoutedRpcSelector router;
414 public RoutedRpcRegImpl(QName rpcType, RpcImplementation implementation, RoutedRpcSelector routedRpcSelector) {
415 super(implementation);
417 router = routedRpcSelector;
421 public void registerPath(QName context, InstanceIdentifier path) {
422 router.addPath(context, path, this);
426 public void unregisterPath(QName context, InstanceIdentifier path) {
427 router.removePath(context, path, this);
431 protected void removeRegistration() {
436 public QName getType() {
442 private void remove(GlobalRpcRegistration registration) {
443 implementations.remove(registration.getType(), registration);
446 private void notifyPathAnnouncement(QName context, QName identifier, InstanceIdentifier path) {
447 RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
448 RouteChange<RpcRoutingContext, InstanceIdentifier> change = RoutingUtils.announcementChange(contextWrapped , path);
449 for(ListenerRegistration<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeListener : routeChangeListeners) {
451 routeListener.getInstance().onRouteChange(change);
452 } catch (Exception e) {
453 LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e);
462 private void notifyPathWithdrawal(QName context,QName identifier, InstanceIdentifier path) {
463 RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
464 RouteChange<RpcRoutingContext, InstanceIdentifier> change = RoutingUtils.removalChange(contextWrapped , path);
465 for(ListenerRegistration<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeListener : routeChangeListeners) {
467 routeListener.getInstance().onRouteChange(change);
468 } catch (Exception e) {
469 LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e);
475 public <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> ListenerRegistration<L> registerRouteChangeListener(
477 ListenerRegistration<L> reg = routeChangeListeners.registerWithType(listener);
478 RouteChange<RpcRoutingContext, InstanceIdentifier> initial = createInitialRouteChange();
480 listener.onRouteChange(initial);
481 } catch (Exception e) {
482 LOG.error("Unhandled exception during sending initial route change event {} to {}",initial,listener, e);
487 private RouteChange<RpcRoutingContext, InstanceIdentifier> createInitialRouteChange() {
488 FluentIterable<RoutedRpcSelector> rpcSelectors = FluentIterable.from(implementations.values()).filter(RoutedRpcSelector.class);
491 ImmutableMap.Builder<RpcRoutingContext, Set<InstanceIdentifier>> announcements = ImmutableMap.builder();
492 ImmutableMap.Builder<RpcRoutingContext, Set<InstanceIdentifier>> removals = ImmutableMap.builder();
493 for (RoutedRpcSelector routedRpcSelector : rpcSelectors) {
494 final RpcRoutingContext context = routedRpcSelector.getIdentifier();
495 final Set<InstanceIdentifier> paths = ImmutableSet.copyOf(routedRpcSelector.implementations.keySet());
496 announcements.put(context, paths);
498 return RoutingUtils.change(announcements.build(), removals.build());