Merge "Bug 849: Fixed NPE in Translated Data Change Events."
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / RemoteRpcProvider.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.sal.connector.remoterpc;
10
11 import static com.google.common.base.Preconditions.checkState;
12
13 import java.util.Collection;
14 import java.util.Collections;
15 import java.util.HashSet;
16 import java.util.Map;
17 import java.util.Set;
18
19 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
20 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
21 import org.opendaylight.controller.sal.connector.api.RpcRouter;
22 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
23 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
24 import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
25 import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
26 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
27 import org.opendaylight.controller.sal.core.api.Provider;
28 import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation;
29 import org.opendaylight.controller.sal.core.api.RpcImplementation;
30 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
31 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
32 import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
33 import org.opendaylight.yangtools.yang.common.QName;
34 import org.opendaylight.yangtools.yang.common.RpcResult;
35 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
36 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
37 import org.osgi.framework.BundleContext;
38 import org.osgi.util.tracker.ServiceTracker;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 import com.google.common.base.Optional;
43 import com.google.common.util.concurrent.ListenableFuture;
44
45 public class RemoteRpcProvider implements
46     RpcImplementation,
47     RoutedRpcDefaultImplementation,
48     AutoCloseable,
49     Provider {
50
51   private final Logger _logger = LoggerFactory.getLogger(RemoteRpcProvider.class);
52
53   private final ServerImpl server;
54   private final ClientImpl client;
55   private RoutingTableProvider routingTableProvider;
56   private final RpcListener listener = new RpcListener();
57   private final RoutedRpcListener routeChangeListener = new RoutedRpcListener();
58   private ProviderSession brokerSession;
59   private RpcProvisionRegistry rpcProvisionRegistry;
60   private BundleContext context;
61   private ServiceTracker clusterTracker;
62
63   public RemoteRpcProvider(ServerImpl server, ClientImpl client) {
64     this.server = server;
65     this.client = client;
66   }
67
68   public void setRoutingTableProvider(RoutingTableProvider provider) {
69     this.routingTableProvider = provider;
70     client.setRoutingTableProvider(provider);
71   }
72
73   public void setContext(BundleContext context){
74     this.context = context;
75   }
76
77   public void setRpcProvisionRegistry(RpcProvisionRegistry rpcProvisionRegistry){
78     this.rpcProvisionRegistry = rpcProvisionRegistry;
79   }
80
81   @Override
82   public void onSessionInitiated(ProviderSession session) {
83     brokerSession = session;
84     server.setBrokerSession(session);
85     start();
86   }
87
88   @Override
89   public Set<QName> getSupportedRpcs() {
90     //TODO: Ask Tony if we need to get this from routing table
91     return Collections.emptySet();
92   }
93
94   @Override
95   public Collection<ProviderFunctionality> getProviderFunctionality() {
96     // TODO Auto-generated method stub
97     return null;
98   }
99
100   @Override
101   public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
102     return client.invokeRpc(rpc, input);
103   }
104
105   @Override
106   public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
107     return client.invokeRpc(rpc, identifier, input);
108   }
109
110   public void start() {
111     server.start();
112     client.start();
113     brokerSession.addRpcRegistrationListener(listener);
114     rpcProvisionRegistry.setRoutedRpcDefaultDelegate(this);
115     rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
116
117     announceSupportedRpcs();
118     announceSupportedRoutedRpcs();
119   }
120
121   @Override
122   public void close() throws Exception {
123     unregisterSupportedRpcs();
124     unregisterSupportedRoutedRpcs();
125     server.close();
126     client.close();
127   }
128
129   public void stop() {
130     server.stop();
131     client.stop();
132   }
133
134   /**
135    * Add all the locally registered RPCs in the clustered routing table
136    */
137   private void announceSupportedRpcs(){
138     Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
139     for (QName rpc : currentlySupported) {
140       listener.onRpcImplementationAdded(rpc);
141     }
142   }
143
144   /**
145    * Add all the locally registered Routed RPCs in the clustered routing table
146    */
147   private void announceSupportedRoutedRpcs(){
148
149     //TODO: announce all routed RPCs as well
150
151   }
152
153   /**
154    * Un-Register all the supported RPCs from clustered routing table
155    */
156   private void unregisterSupportedRpcs(){
157     Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
158     //TODO: remove all routed RPCs as well
159     for (QName rpc : currentlySupported) {
160       listener.onRpcImplementationRemoved(rpc);
161     }
162   }
163
164   /**
165    * Un-Register all the locally supported Routed RPCs from clustered routing table
166    */
167   private void unregisterSupportedRoutedRpcs(){
168
169     //TODO: remove all routed RPCs as well
170
171   }
172
173   private RoutingTable<RpcRouter.RouteIdentifier, String> getRoutingTable(){
174     Optional<RoutingTable<RpcRouter.RouteIdentifier, String>> routingTable =
175         routingTableProvider.getRoutingTable();
176
177     checkState(routingTable.isPresent(), "Routing table is null");
178
179     return routingTable.get();
180   }
181
182   /**
183    * Listener for rpc registrations in broker
184    */
185   private class RpcListener implements RpcRegistrationListener {
186
187     @Override
188     public void onRpcImplementationAdded(QName rpc) {
189
190       _logger.debug("Adding registration for [{}]", rpc);
191       RouteIdentifierImpl routeId = new RouteIdentifierImpl();
192       routeId.setType(rpc);
193
194       RoutingTable<RpcRouter.RouteIdentifier, String> routingTable = getRoutingTable();
195
196       try {
197         routingTable.addGlobalRoute(routeId, server.getServerAddress());
198         _logger.debug("Route added [{}-{}]", routeId, server.getServerAddress());
199
200       } catch (RoutingTableException | SystemException e) {
201         //TODO: This can be thrown when route already exists in the table. Broker
202         //needs to handle this.
203         _logger.error("Unhandled exception while adding global route to routing table [{}]", e);
204
205       }
206     }
207
208     @Override
209     public void onRpcImplementationRemoved(QName rpc) {
210
211       _logger.debug("Removing registration for [{}]", rpc);
212       RouteIdentifierImpl routeId = new RouteIdentifierImpl();
213       routeId.setType(rpc);
214
215       RoutingTable<RpcRouter.RouteIdentifier, String> routingTable = getRoutingTable();
216
217       try {
218         routingTable.removeGlobalRoute(routeId);
219       } catch (RoutingTableException | SystemException e) {
220         _logger.error("Route delete failed {}", e);
221       }
222     }
223   }
224
225   /**
226    * Listener for Routed Rpc registrations in broker
227    */
228   private class RoutedRpcListener
229       implements RouteChangeListener<RpcRoutingContext, InstanceIdentifier> {
230
231     /**
232      *
233      * @param routeChange
234      */
235     @Override
236     public void onRouteChange(RouteChange<RpcRoutingContext, InstanceIdentifier> routeChange) {
237       Map<RpcRoutingContext, Set<InstanceIdentifier>> announcements = routeChange.getAnnouncements();
238       announce(getRouteIdentifiers(announcements));
239
240       Map<RpcRoutingContext, Set<InstanceIdentifier>> removals = routeChange.getRemovals();
241       remove(getRouteIdentifiers(removals));
242     }
243
244     /**
245      *
246      * @param announcements
247      */
248     private void announce(Set<RpcRouter.RouteIdentifier> announcements) {
249       _logger.debug("Announcing [{}]", announcements);
250       RoutingTable<RpcRouter.RouteIdentifier, String> routingTable = getRoutingTable();
251       try {
252         routingTable.addRoutes(announcements, server.getServerAddress());
253       } catch (RoutingTableException | SystemException e) {
254         _logger.error("Route announcement failed {}", e);
255       }
256     }
257
258     /**
259      *
260      * @param removals
261      */
262     private void remove(Set<RpcRouter.RouteIdentifier> removals){
263       _logger.debug("Removing [{}]", removals);
264       RoutingTable<RpcRouter.RouteIdentifier, String> routingTable = getRoutingTable();
265       try {
266         routingTable.removeRoutes(removals, server.getServerAddress());
267       } catch (RoutingTableException | SystemException e) {
268         _logger.error("Route removal failed {}", e);
269       }
270     }
271
272     /**
273      *
274      * @param changes
275      * @return
276      */
277     private Set<RpcRouter.RouteIdentifier> getRouteIdentifiers(Map<RpcRoutingContext, Set<InstanceIdentifier>> changes) {
278       RouteIdentifierImpl routeId = null;
279       Set<RpcRouter.RouteIdentifier> routeIdSet = new HashSet<RpcRouter.RouteIdentifier>();
280
281       for (RpcRoutingContext context : changes.keySet()){
282         routeId = new RouteIdentifierImpl();
283         routeId.setType(context.getRpc());
284         //routeId.setContext(context.getContext());
285
286         for (InstanceIdentifier instanceId : changes.get(context)){
287           routeId.setRoute(instanceId);
288           routeIdSet.add(routeId);
289         }
290       }
291       return routeIdSet;
292     }
293
294   }
295 }