b4d7d2d00109b394f36c1ef138176da4fda364ab
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / sal / dom / broker / impl / SchemaAwareRpcBroker.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.dom.broker.impl;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12
13 import com.google.common.base.Preconditions;
14 import com.google.common.collect.FluentIterable;
15 import com.google.common.collect.ImmutableMap;
16 import com.google.common.collect.ImmutableSet;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19
20 import java.util.Set;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23
24 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
25 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
26 import org.opendaylight.controller.md.sal.common.impl.routing.RoutingUtils;
27 import org.opendaylight.controller.md.sal.dom.broker.spi.rpc.RpcRoutingStrategy;
28 import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
29 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
30 import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation;
31 import org.opendaylight.controller.sal.core.api.RpcImplementation;
32 import org.opendaylight.controller.sal.core.api.RpcImplementationUnavailableException;
33 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
34 import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
35 import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter;
36 import org.opendaylight.yangtools.concepts.Identifiable;
37 import org.opendaylight.yangtools.concepts.ListenerRegistration;
38 import org.opendaylight.yangtools.util.ListenerRegistry;
39 import org.opendaylight.yangtools.yang.common.QName;
40 import org.opendaylight.yangtools.yang.common.RpcResult;
41 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
43 import org.opendaylight.yangtools.yang.model.api.Module;
44 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
45 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 /**
50  * RPC broker responsible for routing requests to remote systems.
51  */
52 public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, RoutedRpcDefaultImplementation {
53
54     private static final Logger LOG = LoggerFactory.getLogger(SchemaAwareRpcBroker.class);
55
56
57     private final ListenerRegistry<RpcRegistrationListener> rpcRegistrationListeners = new ListenerRegistry<>();
58     private final ListenerRegistry<RouteChangeListener<RpcRoutingContext, YangInstanceIdentifier>> routeChangeListeners = new ListenerRegistry<>();
59
60
61     private final String identifier;
62     private final ConcurrentMap<QName, RpcImplementation> implementations = new ConcurrentHashMap<>();
63     private RpcImplementation defaultImplementation;
64     private SchemaContextProvider schemaProvider;
65     private RoutedRpcDefaultImplementation defaultDelegate;
66
67     public SchemaAwareRpcBroker(final String identifier, final SchemaContextProvider schemaProvider) {
68         super();
69         this.identifier = identifier;
70         this.schemaProvider = schemaProvider;
71     }
72
73     public RpcImplementation getDefaultImplementation() {
74         return defaultImplementation;
75     }
76
77     public void setDefaultImplementation(final RpcImplementation defaultImplementation) {
78         this.defaultImplementation = defaultImplementation;
79     }
80
81     public SchemaContextProvider getSchemaProvider() {
82         return schemaProvider;
83     }
84
85     public void setSchemaProvider(final SchemaContextProvider schemaProvider) {
86         this.schemaProvider = schemaProvider;
87     }
88
89     public RoutedRpcDefaultImplementation getRoutedRpcDefaultDelegate() {
90         return defaultDelegate;
91     }
92
93     @Override
94     public void setRoutedRpcDefaultDelegate(final RoutedRpcDefaultImplementation defaultDelegate) {
95         this.defaultDelegate = defaultDelegate;
96     }
97
98     @Override
99     public RoutedRpcRegistration addRoutedRpcImplementation(final QName rpcType, final RpcImplementation implementation) {
100         checkArgument(rpcType != null, "RPC Type should not be null");
101         checkArgument(implementation != null, "RPC Implementatoin should not be null");
102         return getOrCreateRoutedRpcRouter(rpcType).addRoutedRpcImplementation(rpcType, implementation);
103     }
104
105     private RoutedRpcSelector getOrCreateRoutedRpcRouter(final QName rpcType) {
106         RoutedRpcSelector potential = getRoutedRpcRouter(rpcType);
107         if (potential != null) {
108             return potential;
109         }
110         synchronized (implementations) {
111             potential = getRoutedRpcRouter(rpcType);
112             if (potential != null) {
113                 return potential;
114             }
115             RpcDefinition definition = findRpcDefinition(rpcType);
116             RpcRoutingStrategy strategy = RpcRoutingStrategy.from(definition);
117             checkState(strategy.isContextBasedRouted(), "Rpc %s is not routed.", rpcType);
118             potential = new RoutedRpcSelector( strategy, this);
119             implementations.put(rpcType, potential);
120             return potential;
121         }
122     }
123
124     private RoutedRpcSelector getRoutedRpcRouter(final QName rpcType) {
125         RpcImplementation potential = implementations.get(rpcType);
126         if (potential != null) {
127             checkState(potential instanceof RoutedRpcSelector, "Rpc %s is not routed.", rpcType);
128             return (RoutedRpcSelector) potential;
129         }
130         return null;
131
132     }
133
134     @Override
135     public RpcRegistration addRpcImplementation(final QName rpcType, final RpcImplementation implementation)
136             throws IllegalArgumentException {
137         checkArgument(rpcType != null, "RPC Type should not be null");
138         checkArgument(implementation != null, "RPC Implementatoin should not be null");
139         checkState(!hasRpcImplementation(rpcType), "Implementation already registered");
140         RpcDefinition definition = findRpcDefinition(rpcType);
141         checkArgument(!RpcRoutingStrategy.from(definition).isContextBasedRouted(), "RPC Type must not be content routed.");
142         GlobalRpcRegistration reg = new GlobalRpcRegistration(rpcType, implementation, this);
143         final RpcImplementation previous = implementations.putIfAbsent(rpcType, implementation);
144         Preconditions.checkState(previous == null, "Rpc %s is already registered.",rpcType);
145         notifyRpcAdded(rpcType);
146         return reg;
147     }
148
149     private void notifyRpcAdded(final QName rpcType) {
150         for (ListenerRegistration<RpcRegistrationListener> listener : rpcRegistrationListeners) {
151             try {
152                 listener.getInstance().onRpcImplementationAdded(rpcType);
153             } catch (Exception ex) {
154                 LOG.error("Unhandled exception during invoking listener {}", listener.getInstance(), ex);
155             }
156
157         }
158     }
159
160     @Override
161     public ListenerRegistration<RpcRegistrationListener> addRpcRegistrationListener(final RpcRegistrationListener listener) {
162         ListenerRegistration<RpcRegistrationListener> reg = rpcRegistrationListeners.register(listener);
163         for (QName impl : implementations.keySet()) {
164             listener.onRpcImplementationAdded(impl);
165         }
166         return reg;
167     }
168
169     @Override
170     public String getIdentifier() {
171         return identifier;
172     }
173
174     @Override
175     public Set<QName> getSupportedRpcs() {
176         return ImmutableSet.copyOf(implementations.keySet());
177     }
178
179     @Override
180     public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(final QName rpc, final CompositeNode input) {
181         return findRpcImplemention(rpc).invokeRpc(rpc, input);
182     }
183
184     private RpcImplementation findRpcImplemention(final QName rpc) {
185         checkArgument(rpc != null, "Rpc name should not be null");
186         RpcImplementation potentialImpl = implementations.get(rpc);
187         if (potentialImpl != null) {
188             return potentialImpl;
189         }
190
191         potentialImpl = defaultImplementation;
192         if( potentialImpl == null ) {
193             throw new UnsupportedOperationException( "No implementation for this operation is available." );
194         }
195
196         return potentialImpl;
197     }
198
199     private boolean hasRpcImplementation(final QName rpc) {
200         return implementations.containsKey(rpc);
201     }
202
203     private RpcDefinition findRpcDefinition(final QName rpcType) {
204         checkArgument(rpcType != null, "Rpc name must be supplied.");
205         checkState(schemaProvider != null, "Schema Provider is not available.");
206         SchemaContext ctx = schemaProvider.getSchemaContext();
207         checkState(ctx != null, "YANG Schema Context is not available.");
208         Module module = ctx.findModuleByNamespaceAndRevision(rpcType.getNamespace(), rpcType.getRevision());
209         checkState(module != null, "YANG Module is not available.");
210         return findRpcDefinition(rpcType, module.getRpcs());
211     }
212
213     static private RpcDefinition findRpcDefinition(final QName rpcType, final Set<RpcDefinition> rpcs) {
214         checkState(rpcs != null, "Rpc schema is not available.");
215         for (RpcDefinition rpc : rpcs) {
216             if (rpcType.equals(rpc.getQName())) {
217                 return rpc;
218             }
219         }
220         throw new IllegalArgumentException("Supplied Rpc Type is not defined.");
221     }
222
223     @Override
224     public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(final QName rpc, final YangInstanceIdentifier route, final CompositeNode input) {
225         if (defaultDelegate == null) {
226             return Futures.immediateFailedCheckedFuture(new RpcImplementationUnavailableException("No RPC implementation found"));
227         }
228
229         LOG.debug("Forwarding RPC {} path {} to delegate {}", rpc, route);
230         return defaultDelegate.invokeRpc(rpc, route, input);
231     }
232
233     void remove(final GlobalRpcRegistration registration) {
234         implementations.remove(registration.getType(), registration);
235     }
236
237     void notifyPathAnnouncement(final QName context, final QName identifier, final YangInstanceIdentifier path) {
238         RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
239         RouteChange<RpcRoutingContext, YangInstanceIdentifier> change = RoutingUtils.announcementChange(contextWrapped , path);
240         for(ListenerRegistration<RouteChangeListener<RpcRoutingContext, YangInstanceIdentifier>> routeListener : routeChangeListeners) {
241             try {
242                 routeListener.getInstance().onRouteChange(change);
243             } catch (Exception e) {
244                 LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e);
245             }
246         }
247
248     }
249
250     void notifyPathWithdrawal(final QName context,final QName identifier, final YangInstanceIdentifier path) {
251         RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
252         RouteChange<RpcRoutingContext, YangInstanceIdentifier> change = RoutingUtils.removalChange(contextWrapped , path);
253         for(ListenerRegistration<RouteChangeListener<RpcRoutingContext, YangInstanceIdentifier>> routeListener : routeChangeListeners) {
254             try {
255                 routeListener.getInstance().onRouteChange(change);
256             } catch (Exception e) {
257                 LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e);
258             }
259         }
260     }
261
262     @Override
263     public <L extends RouteChangeListener<RpcRoutingContext, YangInstanceIdentifier>> ListenerRegistration<L> registerRouteChangeListener(
264             final L listener) {
265         ListenerRegistration<L> reg = routeChangeListeners.registerWithType(listener);
266         RouteChange<RpcRoutingContext, YangInstanceIdentifier> initial = createInitialRouteChange();
267         try {
268         listener.onRouteChange(initial);
269         } catch (Exception e) {
270             LOG.error("Unhandled exception during sending initial route change event {} to {}",initial,listener, e);
271         }
272         return reg;
273     }
274
275     private RouteChange<RpcRoutingContext, YangInstanceIdentifier> createInitialRouteChange() {
276         FluentIterable<RoutedRpcSelector> rpcSelectors = FluentIterable.from(implementations.values()).filter(RoutedRpcSelector.class);
277
278
279         ImmutableMap.Builder<RpcRoutingContext, Set<YangInstanceIdentifier>> announcements = ImmutableMap.builder();
280         ImmutableMap.Builder<RpcRoutingContext, Set<YangInstanceIdentifier>> removals = ImmutableMap.builder();
281         for (RoutedRpcSelector routedRpcSelector : rpcSelectors) {
282             final RpcRoutingContext context = routedRpcSelector.getIdentifier();
283             final Set<YangInstanceIdentifier> paths = ImmutableSet.copyOf(routedRpcSelector.implementations.keySet());
284             announcements.put(context, paths);
285         }
286         return RoutingUtils.change(announcements.build(), removals.build());
287     }
288 }