2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.dom.broker.impl;
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
13 import com.google.common.base.Preconditions;
14 import com.google.common.collect.FluentIterable;
15 import com.google.common.collect.ImmutableMap;
16 import com.google.common.collect.ImmutableSet;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
24 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
25 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
26 import org.opendaylight.controller.md.sal.common.impl.routing.RoutingUtils;
27 import org.opendaylight.controller.md.sal.dom.broker.spi.rpc.RpcRoutingStrategy;
28 import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
29 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
30 import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation;
31 import org.opendaylight.controller.sal.core.api.RpcImplementation;
32 import org.opendaylight.controller.sal.core.api.RpcImplementationUnavailableException;
33 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
34 import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
35 import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter;
36 import org.opendaylight.yangtools.concepts.Identifiable;
37 import org.opendaylight.yangtools.concepts.ListenerRegistration;
38 import org.opendaylight.yangtools.util.ListenerRegistry;
39 import org.opendaylight.yangtools.yang.common.QName;
40 import org.opendaylight.yangtools.yang.common.RpcResult;
41 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
43 import org.opendaylight.yangtools.yang.model.api.Module;
44 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
45 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
50 * RPC broker responsible for routing requests to remote systems.
52 public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, RoutedRpcDefaultImplementation {
54 private static final Logger LOG = LoggerFactory.getLogger(SchemaAwareRpcBroker.class);
57 private final ListenerRegistry<RpcRegistrationListener> rpcRegistrationListeners = new ListenerRegistry<>();
58 private final ListenerRegistry<RouteChangeListener<RpcRoutingContext, YangInstanceIdentifier>> routeChangeListeners = new ListenerRegistry<>();
61 private final String identifier;
62 private final ConcurrentMap<QName, RpcImplementation> implementations = new ConcurrentHashMap<>();
63 private RpcImplementation defaultImplementation;
64 private SchemaContextProvider schemaProvider;
65 private RoutedRpcDefaultImplementation defaultDelegate;
67 public SchemaAwareRpcBroker(final String identifier, final SchemaContextProvider schemaProvider) {
69 this.identifier = identifier;
70 this.schemaProvider = schemaProvider;
73 public RpcImplementation getDefaultImplementation() {
74 return defaultImplementation;
77 public void setDefaultImplementation(final RpcImplementation defaultImplementation) {
78 this.defaultImplementation = defaultImplementation;
81 public SchemaContextProvider getSchemaProvider() {
82 return schemaProvider;
85 public void setSchemaProvider(final SchemaContextProvider schemaProvider) {
86 this.schemaProvider = schemaProvider;
89 public RoutedRpcDefaultImplementation getRoutedRpcDefaultDelegate() {
90 return defaultDelegate;
94 public void setRoutedRpcDefaultDelegate(final RoutedRpcDefaultImplementation defaultDelegate) {
95 this.defaultDelegate = defaultDelegate;
99 public RoutedRpcRegistration addRoutedRpcImplementation(final QName rpcType, final RpcImplementation implementation) {
100 checkArgument(rpcType != null, "RPC Type should not be null");
101 checkArgument(implementation != null, "RPC Implementatoin should not be null");
102 return getOrCreateRoutedRpcRouter(rpcType).addRoutedRpcImplementation(rpcType, implementation);
105 private RoutedRpcSelector getOrCreateRoutedRpcRouter(final QName rpcType) {
106 RoutedRpcSelector potential = getRoutedRpcRouter(rpcType);
107 if (potential != null) {
110 synchronized (implementations) {
111 potential = getRoutedRpcRouter(rpcType);
112 if (potential != null) {
115 RpcDefinition definition = findRpcDefinition(rpcType);
116 RpcRoutingStrategy strategy = RpcRoutingStrategy.from(definition);
117 checkState(strategy.isContextBasedRouted(), "Rpc %s is not routed.", rpcType);
118 potential = new RoutedRpcSelector( strategy, this);
119 implementations.put(rpcType, potential);
124 private RoutedRpcSelector getRoutedRpcRouter(final QName rpcType) {
125 RpcImplementation potential = implementations.get(rpcType);
126 if (potential != null) {
127 checkState(potential instanceof RoutedRpcSelector, "Rpc %s is not routed.", rpcType);
128 return (RoutedRpcSelector) potential;
135 public RpcRegistration addRpcImplementation(final QName rpcType, final RpcImplementation implementation)
136 throws IllegalArgumentException {
137 checkArgument(rpcType != null, "RPC Type should not be null");
138 checkArgument(implementation != null, "RPC Implementatoin should not be null");
139 checkState(!hasRpcImplementation(rpcType), "Implementation already registered");
140 RpcDefinition definition = findRpcDefinition(rpcType);
141 checkArgument(!RpcRoutingStrategy.from(definition).isContextBasedRouted(), "RPC Type must not be content routed.");
142 GlobalRpcRegistration reg = new GlobalRpcRegistration(rpcType, implementation, this);
143 final RpcImplementation previous = implementations.putIfAbsent(rpcType, implementation);
144 Preconditions.checkState(previous == null, "Rpc %s is already registered.",rpcType);
145 notifyRpcAdded(rpcType);
149 private void notifyRpcAdded(final QName rpcType) {
150 for (ListenerRegistration<RpcRegistrationListener> listener : rpcRegistrationListeners) {
152 listener.getInstance().onRpcImplementationAdded(rpcType);
153 } catch (Exception ex) {
154 LOG.error("Unhandled exception during invoking listener {}", listener.getInstance(), ex);
161 public ListenerRegistration<RpcRegistrationListener> addRpcRegistrationListener(final RpcRegistrationListener listener) {
162 ListenerRegistration<RpcRegistrationListener> reg = rpcRegistrationListeners.register(listener);
163 for (QName impl : implementations.keySet()) {
164 listener.onRpcImplementationAdded(impl);
170 public String getIdentifier() {
175 public Set<QName> getSupportedRpcs() {
176 return ImmutableSet.copyOf(implementations.keySet());
180 public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(final QName rpc, final CompositeNode input) {
181 return findRpcImplemention(rpc).invokeRpc(rpc, input);
184 private RpcImplementation findRpcImplemention(final QName rpc) {
185 checkArgument(rpc != null, "Rpc name should not be null");
186 RpcImplementation potentialImpl = implementations.get(rpc);
187 if (potentialImpl != null) {
188 return potentialImpl;
191 potentialImpl = defaultImplementation;
192 if( potentialImpl == null ) {
193 throw new UnsupportedOperationException( "No implementation for this operation is available." );
196 return potentialImpl;
199 private boolean hasRpcImplementation(final QName rpc) {
200 return implementations.containsKey(rpc);
203 private RpcDefinition findRpcDefinition(final QName rpcType) {
204 checkArgument(rpcType != null, "Rpc name must be supplied.");
205 checkState(schemaProvider != null, "Schema Provider is not available.");
206 SchemaContext ctx = schemaProvider.getSchemaContext();
207 checkState(ctx != null, "YANG Schema Context is not available.");
208 Module module = ctx.findModuleByNamespaceAndRevision(rpcType.getNamespace(), rpcType.getRevision());
209 checkState(module != null, "YANG Module is not available.");
210 return findRpcDefinition(rpcType, module.getRpcs());
213 static private RpcDefinition findRpcDefinition(final QName rpcType, final Set<RpcDefinition> rpcs) {
214 checkState(rpcs != null, "Rpc schema is not available.");
215 for (RpcDefinition rpc : rpcs) {
216 if (rpcType.equals(rpc.getQName())) {
220 throw new IllegalArgumentException("Supplied Rpc Type is not defined.");
224 public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(final QName rpc, final YangInstanceIdentifier route, final CompositeNode input) {
225 if (defaultDelegate == null) {
226 return Futures.immediateFailedCheckedFuture(new RpcImplementationUnavailableException("No RPC implementation found"));
229 LOG.debug("Forwarding RPC {} path {} to delegate {}", rpc, route);
230 return defaultDelegate.invokeRpc(rpc, route, input);
233 void remove(final GlobalRpcRegistration registration) {
234 implementations.remove(registration.getType(), registration);
237 void notifyPathAnnouncement(final QName context, final QName identifier, final YangInstanceIdentifier path) {
238 RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
239 RouteChange<RpcRoutingContext, YangInstanceIdentifier> change = RoutingUtils.announcementChange(contextWrapped , path);
240 for(ListenerRegistration<RouteChangeListener<RpcRoutingContext, YangInstanceIdentifier>> routeListener : routeChangeListeners) {
242 routeListener.getInstance().onRouteChange(change);
243 } catch (Exception e) {
244 LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e);
250 void notifyPathWithdrawal(final QName context,final QName identifier, final YangInstanceIdentifier path) {
251 RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
252 RouteChange<RpcRoutingContext, YangInstanceIdentifier> change = RoutingUtils.removalChange(contextWrapped , path);
253 for(ListenerRegistration<RouteChangeListener<RpcRoutingContext, YangInstanceIdentifier>> routeListener : routeChangeListeners) {
255 routeListener.getInstance().onRouteChange(change);
256 } catch (Exception e) {
257 LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e);
263 public <L extends RouteChangeListener<RpcRoutingContext, YangInstanceIdentifier>> ListenerRegistration<L> registerRouteChangeListener(
265 ListenerRegistration<L> reg = routeChangeListeners.registerWithType(listener);
266 RouteChange<RpcRoutingContext, YangInstanceIdentifier> initial = createInitialRouteChange();
268 listener.onRouteChange(initial);
269 } catch (Exception e) {
270 LOG.error("Unhandled exception during sending initial route change event {} to {}",initial,listener, e);
275 private RouteChange<RpcRoutingContext, YangInstanceIdentifier> createInitialRouteChange() {
276 FluentIterable<RoutedRpcSelector> rpcSelectors = FluentIterable.from(implementations.values()).filter(RoutedRpcSelector.class);
279 ImmutableMap.Builder<RpcRoutingContext, Set<YangInstanceIdentifier>> announcements = ImmutableMap.builder();
280 ImmutableMap.Builder<RpcRoutingContext, Set<YangInstanceIdentifier>> removals = ImmutableMap.builder();
281 for (RoutedRpcSelector routedRpcSelector : rpcSelectors) {
282 final RpcRoutingContext context = routedRpcSelector.getIdentifier();
283 final Set<YangInstanceIdentifier> paths = ImmutableSet.copyOf(routedRpcSelector.implementations.keySet());
284 announcements.put(context, paths);
286 return RoutingUtils.change(announcements.build(), removals.build());