2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.dom.broker.impl;
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.ConcurrentMap;
17 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
18 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
19 import org.opendaylight.controller.md.sal.common.impl.routing.RoutingUtils;
20 import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
21 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
22 import org.opendaylight.controller.sal.core.api.RpcImplementation;
23 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
24 import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
25 import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter;
26 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
27 import org.opendaylight.yangtools.concepts.Identifiable;
28 import org.opendaylight.yangtools.concepts.ListenerRegistration;
29 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
30 import org.opendaylight.yangtools.yang.common.QName;
31 import org.opendaylight.yangtools.yang.common.RpcResult;
32 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
33 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
34 import org.opendaylight.yangtools.yang.data.api.SimpleNode;
35 import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
36 import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
37 import org.opendaylight.yangtools.yang.model.api.Module;
38 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
39 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
40 import org.opendaylight.yangtools.yang.model.api.UnknownSchemaNode;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
44 import com.google.common.base.Optional;
45 import com.google.common.collect.ImmutableSet;
47 public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String> {
49 private static final Logger LOG = LoggerFactory.getLogger(SchemaAwareRpcBroker.class);
51 private static final QName CONTEXT_REFERENCE = QName.create("urn:opendaylight:yang:extension:yang-ext",
52 "2013-07-09", "context-reference");
53 private final ListenerRegistry<RpcRegistrationListener> rpcRegistrationListeners = new ListenerRegistry<>();
54 private final ListenerRegistry<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeChangeListeners = new ListenerRegistry<>();
57 private final String identifier;
58 private final ConcurrentMap<QName, RpcImplementation> implementations = new ConcurrentHashMap<>();
59 private RpcImplementation defaultImplementation;
60 private SchemaContextProvider schemaProvider;
62 public SchemaAwareRpcBroker(String identifier, SchemaContextProvider schemaProvider) {
64 this.identifier = identifier;
65 this.schemaProvider = schemaProvider;
68 public RpcImplementation getDefaultImplementation() {
69 return defaultImplementation;
72 public void setDefaultImplementation(RpcImplementation defaultImplementation) {
73 this.defaultImplementation = defaultImplementation;
76 public SchemaContextProvider getSchemaProvider() {
77 return schemaProvider;
80 public void setSchemaProvider(SchemaContextProvider schemaProvider) {
81 this.schemaProvider = schemaProvider;
85 public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
86 checkArgument(rpcType != null, "RPC Type should not be null");
87 checkArgument(implementation != null, "RPC Implementatoin should not be null");
88 return getOrCreateRoutedRpcRouter(rpcType).addRoutedRpcImplementation(rpcType, implementation);
91 private RoutedRpcSelector getOrCreateRoutedRpcRouter(QName rpcType) {
92 RoutedRpcSelector potential = getRoutedRpcRouter(rpcType);
93 if (potential != null) {
96 synchronized (implementations) {
97 potential = getRoutedRpcRouter(rpcType);
98 if (potential != null) {
101 RpcDefinition definition = findRpcDefinition(rpcType);
102 RoutingStrategy strategy = getRoutingStrategy(definition);
103 checkState(strategy instanceof RoutedRpcStrategy, "Rpc %s is not routed.", rpcType);
104 potential = new RoutedRpcSelector((RoutedRpcStrategy) strategy, this);
105 implementations.put(rpcType, potential);
110 private RoutedRpcSelector getRoutedRpcRouter(QName rpcType) {
111 RpcImplementation potential = implementations.get(rpcType);
112 if (potential != null) {
113 checkState(potential instanceof RoutedRpcSelector, "Rpc %s is not routed.", rpcType);
114 return (RoutedRpcSelector) potential;
121 public RpcRegistration addRpcImplementation(QName rpcType, RpcImplementation implementation)
122 throws IllegalArgumentException {
123 checkArgument(rpcType != null, "RPC Type should not be null");
124 checkArgument(implementation != null, "RPC Implementatoin should not be null");
125 checkState(!hasRpcImplementation(rpcType), "Implementation already registered");
126 RpcDefinition definition = findRpcDefinition(rpcType);
127 checkArgument(!isRoutedRpc(definition), "RPC Type must not be routed.");
128 GlobalRpcRegistration reg = new GlobalRpcRegistration(rpcType, implementation, this);
129 implementations.putIfAbsent(rpcType, implementation);
133 private boolean isRoutedRpc(RpcDefinition definition) {
134 return getRoutingStrategy(definition) instanceof RoutedRpcStrategy;
138 public ListenerRegistration<RpcRegistrationListener> addRpcRegistrationListener(RpcRegistrationListener listener) {
139 return rpcRegistrationListeners.register(listener);
143 public String getIdentifier() {
148 public Set<QName> getSupportedRpcs() {
149 return ImmutableSet.copyOf(implementations.keySet());
153 public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
154 return findRpcImplemention(rpc).invokeRpc(rpc, input);
157 private RpcImplementation findRpcImplemention(QName rpc) {
158 checkArgument(rpc != null, "Rpc name should not be null");
159 RpcImplementation potentialImpl = implementations.get(rpc);
160 if (potentialImpl != null) {
161 return potentialImpl;
163 potentialImpl = defaultImplementation;
164 checkState(potentialImpl != null, "Implementation is not available.");
165 return potentialImpl;
168 private boolean hasRpcImplementation(QName rpc) {
169 return implementations.containsKey(rpc);
172 private RpcDefinition findRpcDefinition(QName rpcType) {
173 checkArgument(rpcType != null, "Rpc name must be supplied.");
174 checkState(schemaProvider != null, "Schema Provider is not available.");
175 SchemaContext ctx = schemaProvider.getSchemaContext();
176 checkState(ctx != null, "YANG Schema Context is not available.");
177 Module module = ctx.findModuleByNamespaceAndRevision(rpcType.getNamespace(), rpcType.getRevision());
178 checkState(module != null, "YANG Module is not available.");
179 return findRpcDefinition(rpcType, module.getRpcs());
182 static private RpcDefinition findRpcDefinition(QName rpcType, Set<RpcDefinition> rpcs) {
183 checkState(rpcs != null, "Rpc schema is not available.");
184 for (RpcDefinition rpc : rpcs) {
185 if (rpcType.equals(rpc.getQName())) {
189 throw new IllegalArgumentException("Supplied Rpc Type is not defined.");
192 private RoutingStrategy getRoutingStrategy(RpcDefinition rpc) {
193 ContainerSchemaNode input = rpc.getInput();
195 for (DataSchemaNode schemaNode : input.getChildNodes()) {
196 Optional<QName> context = getRoutingContext(schemaNode);
197 if (context.isPresent()) {
198 return createRoutedStrategy(rpc, context.get(), schemaNode.getQName());
202 return createGlobalStrategy(rpc);
205 private static RoutingStrategy createRoutedStrategy(RpcDefinition rpc, QName context, QName leafNode) {
206 return new RoutedRpcStrategy(rpc.getQName(), context, leafNode);
209 private Optional<QName> getRoutingContext(DataSchemaNode schemaNode) {
210 for (UnknownSchemaNode extension : schemaNode.getUnknownSchemaNodes()) {
211 if (CONTEXT_REFERENCE.equals(extension.getNodeType())) {
212 return Optional.fromNullable(extension.getQName());
216 return Optional.absent();
219 private static RoutingStrategy createGlobalStrategy(RpcDefinition rpc) {
220 GlobalRpcStrategy ret = new GlobalRpcStrategy(rpc.getQName());
224 private static abstract class RoutingStrategy implements Identifiable<QName> {
226 private final QName identifier;
228 public RoutingStrategy(QName identifier) {
230 this.identifier = identifier;
234 public QName getIdentifier() {
239 private static class GlobalRpcStrategy extends RoutingStrategy {
241 public GlobalRpcStrategy(QName identifier) {
246 private static class RoutedRpcStrategy extends RoutingStrategy {
248 private final QName context;
249 private final QName leaf;
251 public RoutedRpcStrategy(QName identifier, QName ctx, QName leaf) {
257 public QName getContext() {
261 public QName getLeaf() {
266 private static class RoutedRpcSelector implements RpcImplementation, AutoCloseable, Identifiable<QName> {
268 private final RoutedRpcStrategy strategy;
269 private final Set<QName> supportedRpcs;
270 private RpcImplementation defaultDelegate;
271 private final ConcurrentMap<InstanceIdentifier, RoutedRpcRegImpl> implementations = new ConcurrentHashMap<>();
272 private SchemaAwareRpcBroker router;
274 public RoutedRpcSelector(RoutedRpcStrategy strategy, SchemaAwareRpcBroker router) {
276 this.strategy = strategy;
277 supportedRpcs = ImmutableSet.of(strategy.getIdentifier());
278 this.router = router;
282 public QName getIdentifier() {
283 return strategy.getIdentifier();
287 public void close() throws Exception {
292 public Set<QName> getSupportedRpcs() {
293 return supportedRpcs;
296 public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
297 return new RoutedRpcRegImpl(rpcType, implementation, this);
301 public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
302 CompositeNode inputContainer = input.getFirstCompositeByName(QName.create(rpc,"input"));
303 checkArgument(inputContainer != null, "Rpc payload must contain input element");
304 SimpleNode<?> routeContainer = inputContainer.getFirstSimpleByName(strategy.getLeaf());
305 checkArgument(routeContainer != null, "Leaf %s must be set with value", strategy.getLeaf());
306 Object route = routeContainer.getValue();
307 RpcImplementation potential = null;
309 RoutedRpcRegImpl potentialReg = implementations.get(route);
310 if (potentialReg != null) {
311 potential = potentialReg.getInstance();
314 if (potential == null) {
315 potential = defaultDelegate;
317 checkState(potential != null, "No implementation is available for rpc:%s path:%s", rpc, route);
318 return potential.invokeRpc(rpc, input);
321 public void addPath(QName context, InstanceIdentifier path, RoutedRpcRegImpl routedRpcRegImpl) {
322 //checkArgument(strategy.getContext().equals(context),"Supplied context is not supported.");
323 RoutedRpcRegImpl previous = implementations.put(path, routedRpcRegImpl);
324 if (previous == null) {
325 router.notifyPathAnnouncement(context,strategy.getIdentifier(), path);
330 public void removePath(QName context, InstanceIdentifier path, RoutedRpcRegImpl routedRpcRegImpl) {
331 boolean removed = implementations.remove(path, routedRpcRegImpl);
333 router.notifyPathWithdrawal(context, strategy.getIdentifier(), path);
338 private static class GlobalRpcRegistration extends AbstractObjectRegistration<RpcImplementation> implements
340 private final QName type;
341 private SchemaAwareRpcBroker router;
343 public GlobalRpcRegistration(QName type, RpcImplementation instance, SchemaAwareRpcBroker router) {
346 this.router = router;
350 public QName getType() {
355 protected void removeRegistration() {
356 if (router != null) {
363 private static class RoutedRpcRegImpl extends AbstractObjectRegistration<RpcImplementation> implements
364 RoutedRpcRegistration {
366 private final QName type;
367 private RoutedRpcSelector router;
369 public RoutedRpcRegImpl(QName rpcType, RpcImplementation implementation, RoutedRpcSelector routedRpcSelector) {
370 super(implementation);
372 router = routedRpcSelector;
376 public void registerPath(QName context, InstanceIdentifier path) {
377 router.addPath(context, path, this);
381 public void unregisterPath(QName context, InstanceIdentifier path) {
382 router.removePath(context, path, this);
386 protected void removeRegistration() {
391 public QName getType() {
397 private void remove(GlobalRpcRegistration registration) {
398 implementations.remove(registration.getType(), registration);
401 private void notifyPathAnnouncement(QName context, QName identifier, InstanceIdentifier path) {
402 RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
403 RouteChange<RpcRoutingContext, InstanceIdentifier> change = RoutingUtils.announcementChange(contextWrapped , path);
404 for(ListenerRegistration<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeListener : routeChangeListeners) {
406 routeListener.getInstance().onRouteChange(change);
407 } catch (Exception e) {
408 LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e);
417 private void notifyPathWithdrawal(QName context,QName identifier, InstanceIdentifier path) {
418 RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
419 RouteChange<RpcRoutingContext, InstanceIdentifier> change = RoutingUtils.removalChange(contextWrapped , path);
420 for(ListenerRegistration<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeListener : routeChangeListeners) {
422 routeListener.getInstance().onRouteChange(change);
423 } catch (Exception e) {
424 LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e);
430 public <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> ListenerRegistration<L> registerRouteChangeListener(
432 return routeChangeListeners.registerWithType(listener);