Merge "BUG-190 Simplify reconnect logic in protocol-framework."
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / sal / dom / broker / impl / SchemaAwareRpcBroker.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.dom.broker.impl;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12
13 import java.util.Set;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.ConcurrentMap;
16
17 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
18 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
19 import org.opendaylight.controller.md.sal.common.impl.routing.RoutingUtils;
20 import org.opendaylight.controller.md.sal.dom.broker.spi.rpc.RpcRoutingStrategy;
21 import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
22 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
23 import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation;
24 import org.opendaylight.controller.sal.core.api.RpcImplementation;
25 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
26 import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
27 import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter;
28 import org.opendaylight.yangtools.concepts.Identifiable;
29 import org.opendaylight.yangtools.concepts.ListenerRegistration;
30 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
31 import org.opendaylight.yangtools.yang.common.QName;
32 import org.opendaylight.yangtools.yang.common.RpcResult;
33 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
34 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
35 import org.opendaylight.yangtools.yang.model.api.Module;
36 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
37 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 import com.google.common.base.Preconditions;
42 import com.google.common.collect.FluentIterable;
43 import com.google.common.collect.ImmutableMap;
44 import com.google.common.collect.ImmutableSet;
45 import com.google.common.util.concurrent.ListenableFuture;
46
47 public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, RoutedRpcDefaultImplementation {
48
49     private static final Logger LOG = LoggerFactory.getLogger(SchemaAwareRpcBroker.class);
50
51
52     private final ListenerRegistry<RpcRegistrationListener> rpcRegistrationListeners = new ListenerRegistry<>();
53     private final ListenerRegistry<RouteChangeListener<RpcRoutingContext, YangInstanceIdentifier>> routeChangeListeners = new ListenerRegistry<>();
54
55
56     private final String identifier;
57     private final ConcurrentMap<QName, RpcImplementation> implementations = new ConcurrentHashMap<>();
58     private RpcImplementation defaultImplementation;
59     private SchemaContextProvider schemaProvider;
60     private RoutedRpcDefaultImplementation defaultDelegate;
61
62     public SchemaAwareRpcBroker(final String identifier, final SchemaContextProvider schemaProvider) {
63         super();
64         this.identifier = identifier;
65         this.schemaProvider = schemaProvider;
66     }
67
68     public RpcImplementation getDefaultImplementation() {
69         return defaultImplementation;
70     }
71
72     public void setDefaultImplementation(final RpcImplementation defaultImplementation) {
73         this.defaultImplementation = defaultImplementation;
74     }
75
76     public SchemaContextProvider getSchemaProvider() {
77         return schemaProvider;
78     }
79
80     public void setSchemaProvider(final SchemaContextProvider schemaProvider) {
81         this.schemaProvider = schemaProvider;
82     }
83
84     public RoutedRpcDefaultImplementation getRoutedRpcDefaultDelegate() {
85         return defaultDelegate;
86     }
87
88     @Override
89     public void setRoutedRpcDefaultDelegate(final RoutedRpcDefaultImplementation defaultDelegate) {
90         this.defaultDelegate = defaultDelegate;
91     }
92
93     @Override
94     public RoutedRpcRegistration addRoutedRpcImplementation(final QName rpcType, final RpcImplementation implementation) {
95         checkArgument(rpcType != null, "RPC Type should not be null");
96         checkArgument(implementation != null, "RPC Implementatoin should not be null");
97         return getOrCreateRoutedRpcRouter(rpcType).addRoutedRpcImplementation(rpcType, implementation);
98     }
99
100     private RoutedRpcSelector getOrCreateRoutedRpcRouter(final QName rpcType) {
101         RoutedRpcSelector potential = getRoutedRpcRouter(rpcType);
102         if (potential != null) {
103             return potential;
104         }
105         synchronized (implementations) {
106             potential = getRoutedRpcRouter(rpcType);
107             if (potential != null) {
108                 return potential;
109             }
110             RpcDefinition definition = findRpcDefinition(rpcType);
111             RpcRoutingStrategy strategy = RpcRoutingStrategy.from(definition);
112             checkState(strategy.isContextBasedRouted(), "Rpc %s is not routed.", rpcType);
113             potential = new RoutedRpcSelector( strategy, this);
114             implementations.put(rpcType, potential);
115             return potential;
116         }
117     }
118
119     private RoutedRpcSelector getRoutedRpcRouter(final QName rpcType) {
120         RpcImplementation potential = implementations.get(rpcType);
121         if (potential != null) {
122             checkState(potential instanceof RoutedRpcSelector, "Rpc %s is not routed.", rpcType);
123             return (RoutedRpcSelector) potential;
124         }
125         return null;
126
127     }
128
129     @Override
130     public RpcRegistration addRpcImplementation(final QName rpcType, final RpcImplementation implementation)
131             throws IllegalArgumentException {
132         checkArgument(rpcType != null, "RPC Type should not be null");
133         checkArgument(implementation != null, "RPC Implementatoin should not be null");
134         checkState(!hasRpcImplementation(rpcType), "Implementation already registered");
135         RpcDefinition definition = findRpcDefinition(rpcType);
136         checkArgument(!RpcRoutingStrategy.from(definition).isContextBasedRouted(), "RPC Type must not be content routed.");
137         GlobalRpcRegistration reg = new GlobalRpcRegistration(rpcType, implementation, this);
138         final RpcImplementation previous = implementations.putIfAbsent(rpcType, implementation);
139         Preconditions.checkState(previous == null, "Rpc %s is already registered.",rpcType);
140         notifyRpcAdded(rpcType);
141         return reg;
142     }
143
144     private void notifyRpcAdded(final QName rpcType) {
145         for (ListenerRegistration<RpcRegistrationListener> listener : rpcRegistrationListeners) {
146             try {
147                 listener.getInstance().onRpcImplementationAdded(rpcType);
148             } catch (Exception ex) {
149                 LOG.error("Unhandled exception during invoking listener {}", listener.getInstance(), ex);
150             }
151
152         }
153     }
154
155     @Override
156     public ListenerRegistration<RpcRegistrationListener> addRpcRegistrationListener(final RpcRegistrationListener listener) {
157         ListenerRegistration<RpcRegistrationListener> reg = rpcRegistrationListeners.register(listener);
158         for (QName impl : implementations.keySet()) {
159             listener.onRpcImplementationAdded(impl);
160         }
161         return reg;
162     }
163
164     @Override
165     public String getIdentifier() {
166         return identifier;
167     }
168
169     @Override
170     public Set<QName> getSupportedRpcs() {
171         return ImmutableSet.copyOf(implementations.keySet());
172     }
173
174     @Override
175     public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(final QName rpc, final CompositeNode input) {
176         return findRpcImplemention(rpc).invokeRpc(rpc, input);
177     }
178
179     private RpcImplementation findRpcImplemention(final QName rpc) {
180         checkArgument(rpc != null, "Rpc name should not be null");
181         RpcImplementation potentialImpl = implementations.get(rpc);
182         if (potentialImpl != null) {
183             return potentialImpl;
184         }
185
186         potentialImpl = defaultImplementation;
187         if( potentialImpl == null ) {
188             throw new UnsupportedOperationException( "No implementation for this operation is available." );
189         }
190
191         return potentialImpl;
192     }
193
194     private boolean hasRpcImplementation(final QName rpc) {
195         return implementations.containsKey(rpc);
196     }
197
198     private RpcDefinition findRpcDefinition(final QName rpcType) {
199         checkArgument(rpcType != null, "Rpc name must be supplied.");
200         checkState(schemaProvider != null, "Schema Provider is not available.");
201         SchemaContext ctx = schemaProvider.getSchemaContext();
202         checkState(ctx != null, "YANG Schema Context is not available.");
203         Module module = ctx.findModuleByNamespaceAndRevision(rpcType.getNamespace(), rpcType.getRevision());
204         checkState(module != null, "YANG Module is not available.");
205         return findRpcDefinition(rpcType, module.getRpcs());
206     }
207
208     static private RpcDefinition findRpcDefinition(final QName rpcType, final Set<RpcDefinition> rpcs) {
209         checkState(rpcs != null, "Rpc schema is not available.");
210         for (RpcDefinition rpc : rpcs) {
211             if (rpcType.equals(rpc.getQName())) {
212                 return rpc;
213             }
214         }
215         throw new IllegalArgumentException("Supplied Rpc Type is not defined.");
216     }
217
218     @Override
219     public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(final QName rpc, final YangInstanceIdentifier route, final CompositeNode input) {
220       checkState(defaultDelegate != null, "No implementation is available for rpc:%s path:%s", rpc, route);
221       return defaultDelegate.invokeRpc(rpc, route, input);
222     }
223
224     void remove(final GlobalRpcRegistration registration) {
225         implementations.remove(registration.getType(), registration);
226     }
227
228     void notifyPathAnnouncement(final QName context, final QName identifier, final YangInstanceIdentifier path) {
229         RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
230         RouteChange<RpcRoutingContext, YangInstanceIdentifier> change = RoutingUtils.announcementChange(contextWrapped , path);
231         for(ListenerRegistration<RouteChangeListener<RpcRoutingContext, YangInstanceIdentifier>> routeListener : routeChangeListeners) {
232             try {
233                 routeListener.getInstance().onRouteChange(change);
234             } catch (Exception e) {
235                 LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e);
236             }
237         }
238
239     }
240
241     void notifyPathWithdrawal(final QName context,final QName identifier, final YangInstanceIdentifier path) {
242         RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
243         RouteChange<RpcRoutingContext, YangInstanceIdentifier> change = RoutingUtils.removalChange(contextWrapped , path);
244         for(ListenerRegistration<RouteChangeListener<RpcRoutingContext, YangInstanceIdentifier>> routeListener : routeChangeListeners) {
245             try {
246                 routeListener.getInstance().onRouteChange(change);
247             } catch (Exception e) {
248                 LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e);
249             }
250         }
251     }
252
253     @Override
254     public <L extends RouteChangeListener<RpcRoutingContext, YangInstanceIdentifier>> ListenerRegistration<L> registerRouteChangeListener(
255             final L listener) {
256         ListenerRegistration<L> reg = routeChangeListeners.registerWithType(listener);
257         RouteChange<RpcRoutingContext, YangInstanceIdentifier> initial = createInitialRouteChange();
258         try {
259         listener.onRouteChange(initial);
260         } catch (Exception e) {
261             LOG.error("Unhandled exception during sending initial route change event {} to {}",initial,listener, e);
262         }
263         return reg;
264     }
265
266     private RouteChange<RpcRoutingContext, YangInstanceIdentifier> createInitialRouteChange() {
267         FluentIterable<RoutedRpcSelector> rpcSelectors = FluentIterable.from(implementations.values()).filter(RoutedRpcSelector.class);
268
269
270         ImmutableMap.Builder<RpcRoutingContext, Set<YangInstanceIdentifier>> announcements = ImmutableMap.builder();
271         ImmutableMap.Builder<RpcRoutingContext, Set<YangInstanceIdentifier>> removals = ImmutableMap.builder();
272         for (RoutedRpcSelector routedRpcSelector : rpcSelectors) {
273             final RpcRoutingContext context = routedRpcSelector.getIdentifier();
274             final Set<YangInstanceIdentifier> paths = ImmutableSet.copyOf(routedRpcSelector.implementations.keySet());
275             announcements.put(context, paths);
276         }
277         return RoutingUtils.change(announcements.build(), removals.build());
278     }
279 }