Migrate FRM/SM to use model API SchemaContextProvider
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / sal / dom / broker / impl / SchemaAwareRpcBroker.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.dom.broker.impl;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.FluentIterable;
14 import com.google.common.collect.ImmutableMap;
15 import com.google.common.collect.ImmutableSet;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import java.util.Set;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.ConcurrentMap;
21 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
22 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
23 import org.opendaylight.controller.md.sal.common.impl.routing.RoutingUtils;
24 import org.opendaylight.controller.md.sal.dom.broker.spi.rpc.RpcRoutingStrategy;
25 import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
26 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
27 import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation;
28 import org.opendaylight.controller.sal.core.api.RpcImplementation;
29 import org.opendaylight.controller.sal.core.api.RpcImplementationUnavailableException;
30 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
31 import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
32 import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter;
33 import org.opendaylight.yangtools.concepts.Identifiable;
34 import org.opendaylight.yangtools.concepts.ListenerRegistration;
35 import org.opendaylight.yangtools.util.ListenerRegistry;
36 import org.opendaylight.yangtools.yang.common.QName;
37 import org.opendaylight.yangtools.yang.common.RpcResult;
38 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
39 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
40 import org.opendaylight.yangtools.yang.model.api.Module;
41 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
42 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
43 import org.opendaylight.yangtools.yang.model.api.SchemaContextProvider;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 /**
48  * RPC broker responsible for routing requests to remote systems.
49  */
50 public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, RoutedRpcDefaultImplementation {
51
52     private static final Logger LOG = LoggerFactory.getLogger(SchemaAwareRpcBroker.class);
53
54
55     private final ListenerRegistry<RpcRegistrationListener> rpcRegistrationListeners = new ListenerRegistry<>();
56     private final ListenerRegistry<RouteChangeListener<RpcRoutingContext, YangInstanceIdentifier>> routeChangeListeners = new ListenerRegistry<>();
57
58
59     private final String identifier;
60     private final ConcurrentMap<QName, RpcImplementation> implementations = new ConcurrentHashMap<>();
61     private RpcImplementation defaultImplementation;
62     private SchemaContextProvider schemaProvider;
63     private RoutedRpcDefaultImplementation defaultDelegate;
64
65     public SchemaAwareRpcBroker(final String identifier, final SchemaContextProvider schemaProvider) {
66         super();
67         this.identifier = identifier;
68         this.schemaProvider = schemaProvider;
69     }
70
71     public RpcImplementation getDefaultImplementation() {
72         return defaultImplementation;
73     }
74
75     public void setDefaultImplementation(final RpcImplementation defaultImplementation) {
76         this.defaultImplementation = defaultImplementation;
77     }
78
79     public SchemaContextProvider getSchemaProvider() {
80         return schemaProvider;
81     }
82
83     public void setSchemaProvider(final SchemaContextProvider schemaProvider) {
84         this.schemaProvider = schemaProvider;
85     }
86
87     public RoutedRpcDefaultImplementation getRoutedRpcDefaultDelegate() {
88         return defaultDelegate;
89     }
90
91     @Override
92     public void setRoutedRpcDefaultDelegate(final RoutedRpcDefaultImplementation defaultDelegate) {
93         this.defaultDelegate = defaultDelegate;
94     }
95
96     @Override
97     public RoutedRpcRegistration addRoutedRpcImplementation(final QName rpcType, final RpcImplementation implementation) {
98         checkArgument(rpcType != null, "RPC Type should not be null");
99         checkArgument(implementation != null, "RPC Implementatoin should not be null");
100         return getOrCreateRoutedRpcRouter(rpcType).addRoutedRpcImplementation(rpcType, implementation);
101     }
102
103     private RoutedRpcSelector getOrCreateRoutedRpcRouter(final QName rpcType) {
104         RoutedRpcSelector potential = getRoutedRpcRouter(rpcType);
105         if (potential != null) {
106             return potential;
107         }
108         synchronized (implementations) {
109             potential = getRoutedRpcRouter(rpcType);
110             if (potential != null) {
111                 return potential;
112             }
113             RpcDefinition definition = findRpcDefinition(rpcType);
114             RpcRoutingStrategy strategy = RpcRoutingStrategy.from(definition);
115             checkState(strategy.isContextBasedRouted(), "Rpc %s is not routed.", rpcType);
116             potential = new RoutedRpcSelector( strategy, this);
117             implementations.put(rpcType, potential);
118             return potential;
119         }
120     }
121
122     private RoutedRpcSelector getRoutedRpcRouter(final QName rpcType) {
123         RpcImplementation potential = implementations.get(rpcType);
124         if (potential != null) {
125             checkState(potential instanceof RoutedRpcSelector, "Rpc %s is not routed.", rpcType);
126             return (RoutedRpcSelector) potential;
127         }
128         return null;
129
130     }
131
132     @Override
133     public RpcRegistration addRpcImplementation(final QName rpcType, final RpcImplementation implementation)
134             throws IllegalArgumentException {
135         checkArgument(rpcType != null, "RPC Type should not be null");
136         checkArgument(implementation != null, "RPC Implementatoin should not be null");
137         checkState(!hasRpcImplementation(rpcType), "Implementation already registered");
138         RpcDefinition definition = findRpcDefinition(rpcType);
139         checkArgument(!RpcRoutingStrategy.from(definition).isContextBasedRouted(), "RPC Type must not be content routed.");
140         GlobalRpcRegistration reg = new GlobalRpcRegistration(rpcType, implementation, this);
141         final RpcImplementation previous = implementations.putIfAbsent(rpcType, implementation);
142         Preconditions.checkState(previous == null, "Rpc %s is already registered.",rpcType);
143         notifyRpcAdded(rpcType);
144         return reg;
145     }
146
147     private void notifyRpcAdded(final QName rpcType) {
148         for (ListenerRegistration<RpcRegistrationListener> listener : rpcRegistrationListeners) {
149             try {
150                 listener.getInstance().onRpcImplementationAdded(rpcType);
151             } catch (Exception ex) {
152                 LOG.error("Unhandled exception during invoking listener {}", listener.getInstance(), ex);
153             }
154
155         }
156     }
157
158     @Override
159     public ListenerRegistration<RpcRegistrationListener> addRpcRegistrationListener(final RpcRegistrationListener listener) {
160         ListenerRegistration<RpcRegistrationListener> reg = rpcRegistrationListeners.register(listener);
161         for (QName impl : implementations.keySet()) {
162             listener.onRpcImplementationAdded(impl);
163         }
164         return reg;
165     }
166
167     @Override
168     public String getIdentifier() {
169         return identifier;
170     }
171
172     @Override
173     public Set<QName> getSupportedRpcs() {
174         return ImmutableSet.copyOf(implementations.keySet());
175     }
176
177     @Override
178     public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(final QName rpc, final CompositeNode input) {
179         return findRpcImplemention(rpc).invokeRpc(rpc, input);
180     }
181
182     private RpcImplementation findRpcImplemention(final QName rpc) {
183         checkArgument(rpc != null, "Rpc name should not be null");
184         RpcImplementation potentialImpl = implementations.get(rpc);
185         if (potentialImpl != null) {
186             return potentialImpl;
187         }
188
189         potentialImpl = defaultImplementation;
190         if( potentialImpl == null ) {
191             throw new UnsupportedOperationException( "No implementation for this operation is available." );
192         }
193
194         return potentialImpl;
195     }
196
197     private boolean hasRpcImplementation(final QName rpc) {
198         return implementations.containsKey(rpc);
199     }
200
201     private RpcDefinition findRpcDefinition(final QName rpcType) {
202         checkArgument(rpcType != null, "Rpc name must be supplied.");
203         checkState(schemaProvider != null, "Schema Provider is not available.");
204         SchemaContext ctx = schemaProvider.getSchemaContext();
205         checkState(ctx != null, "YANG Schema Context is not available.");
206         Module module = ctx.findModuleByNamespaceAndRevision(rpcType.getNamespace(), rpcType.getRevision());
207         checkState(module != null, "YANG Module is not available.");
208         return findRpcDefinition(rpcType, module.getRpcs());
209     }
210
211     static private RpcDefinition findRpcDefinition(final QName rpcType, final Set<RpcDefinition> rpcs) {
212         checkState(rpcs != null, "Rpc schema is not available.");
213         for (RpcDefinition rpc : rpcs) {
214             if (rpcType.equals(rpc.getQName())) {
215                 return rpc;
216             }
217         }
218         throw new IllegalArgumentException("Supplied Rpc Type is not defined.");
219     }
220
221     @Override
222     public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(final QName rpc, final YangInstanceIdentifier route, final CompositeNode input) {
223         if (defaultDelegate == null) {
224             return Futures.immediateFailedCheckedFuture(new RpcImplementationUnavailableException("No RPC implementation found"));
225         }
226
227         LOG.debug("Forwarding RPC {} path {} to delegate {}", rpc, route);
228         return defaultDelegate.invokeRpc(rpc, route, input);
229     }
230
231     void remove(final GlobalRpcRegistration registration) {
232         implementations.remove(registration.getType(), registration);
233     }
234
235     void notifyPathAnnouncement(final QName context, final QName identifier, final YangInstanceIdentifier path) {
236         RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
237         RouteChange<RpcRoutingContext, YangInstanceIdentifier> change = RoutingUtils.announcementChange(contextWrapped , path);
238         for(ListenerRegistration<RouteChangeListener<RpcRoutingContext, YangInstanceIdentifier>> routeListener : routeChangeListeners) {
239             try {
240                 routeListener.getInstance().onRouteChange(change);
241             } catch (Exception e) {
242                 LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e);
243             }
244         }
245
246     }
247
248     void notifyPathWithdrawal(final QName context,final QName identifier, final YangInstanceIdentifier path) {
249         RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
250         RouteChange<RpcRoutingContext, YangInstanceIdentifier> change = RoutingUtils.removalChange(contextWrapped , path);
251         for(ListenerRegistration<RouteChangeListener<RpcRoutingContext, YangInstanceIdentifier>> routeListener : routeChangeListeners) {
252             try {
253                 routeListener.getInstance().onRouteChange(change);
254             } catch (Exception e) {
255                 LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e);
256             }
257         }
258     }
259
260     @Override
261     public <L extends RouteChangeListener<RpcRoutingContext, YangInstanceIdentifier>> ListenerRegistration<L> registerRouteChangeListener(
262             final L listener) {
263         ListenerRegistration<L> reg = routeChangeListeners.registerWithType(listener);
264         RouteChange<RpcRoutingContext, YangInstanceIdentifier> initial = createInitialRouteChange();
265         try {
266         listener.onRouteChange(initial);
267         } catch (Exception e) {
268             LOG.error("Unhandled exception during sending initial route change event {} to {}",initial,listener, e);
269         }
270         return reg;
271     }
272
273     private RouteChange<RpcRoutingContext, YangInstanceIdentifier> createInitialRouteChange() {
274         FluentIterable<RoutedRpcSelector> rpcSelectors = FluentIterable.from(implementations.values()).filter(RoutedRpcSelector.class);
275
276
277         ImmutableMap.Builder<RpcRoutingContext, Set<YangInstanceIdentifier>> announcements = ImmutableMap.builder();
278         ImmutableMap.Builder<RpcRoutingContext, Set<YangInstanceIdentifier>> removals = ImmutableMap.builder();
279         for (RoutedRpcSelector routedRpcSelector : rpcSelectors) {
280             final RpcRoutingContext context = routedRpcSelector.getIdentifier();
281             final Set<YangInstanceIdentifier> paths = ImmutableSet.copyOf(routedRpcSelector.implementations.keySet());
282             announcements.put(context, paths);
283         }
284         return RoutingUtils.change(announcements.build(), removals.build());
285     }
286 }