2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.sal.connector.remoterpc;
11 import static com.google.common.base.Preconditions.checkNotNull;
13 import java.util.Collection;
14 import java.util.Collections;
15 import java.util.HashSet;
19 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
20 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
21 import org.opendaylight.controller.sal.connector.api.RpcRouter;
22 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
23 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
24 import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
25 import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
26 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
27 import org.opendaylight.controller.sal.core.api.Provider;
28 import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation;
29 import org.opendaylight.controller.sal.core.api.RpcImplementation;
30 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
31 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
32 import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
33 import org.opendaylight.yangtools.yang.common.QName;
34 import org.opendaylight.yangtools.yang.common.RpcResult;
35 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
36 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
37 import org.osgi.framework.BundleContext;
38 import org.osgi.util.tracker.ServiceTracker;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
42 import com.google.common.base.Optional;
43 import com.google.common.util.concurrent.ListenableFuture;
45 public class RemoteRpcProvider implements
47 RoutedRpcDefaultImplementation,
51 private final Logger _logger = LoggerFactory.getLogger(RemoteRpcProvider.class);
53 private final ServerImpl server;
54 private final ClientImpl client;
55 private RoutingTableProvider routingTableProvider;
56 private final RpcListener listener = new RpcListener();
57 private final RoutedRpcListener routeChangeListener = new RoutedRpcListener();
58 private ProviderSession brokerSession;
59 private RpcProvisionRegistry rpcProvisionRegistry;
60 private BundleContext context;
61 private ServiceTracker clusterTracker;
63 public RemoteRpcProvider(ServerImpl server, ClientImpl client) {
68 public void setRoutingTableProvider(RoutingTableProvider provider) {
69 this.routingTableProvider = provider;
70 client.setRoutingTableProvider(provider);
73 public void setContext(BundleContext context){
74 this.context = context;
77 public void setRpcProvisionRegistry(RpcProvisionRegistry rpcProvisionRegistry){
78 this.rpcProvisionRegistry = rpcProvisionRegistry;
82 public void onSessionInitiated(ProviderSession session) {
83 brokerSession = session;
84 server.setBrokerSession(session);
89 public Set<QName> getSupportedRpcs() {
90 //TODO: Ask Tony if we need to get this from routing table
91 return Collections.emptySet();
95 public Collection<ProviderFunctionality> getProviderFunctionality() {
96 // TODO Auto-generated method stub
101 public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
102 return client.invokeRpc(rpc, input);
106 public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
107 return client.invokeRpc(rpc, identifier, input);
110 public void start() {
113 brokerSession.addRpcRegistrationListener(listener);
114 rpcProvisionRegistry.setRoutedRpcDefaultDelegate(this);
115 rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
117 announceSupportedRpcs();
118 announceSupportedRoutedRpcs();
122 public void close() throws Exception {
123 unregisterSupportedRpcs();
124 unregisterSupportedRoutedRpcs();
135 * Add all the locally registered RPCs in the clustered routing table
137 private void announceSupportedRpcs(){
138 Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
139 for (QName rpc : currentlySupported) {
140 listener.onRpcImplementationAdded(rpc);
145 * Add all the locally registered Routed RPCs in the clustered routing table
147 private void announceSupportedRoutedRpcs(){
149 //TODO: announce all routed RPCs as well
154 * Un-Register all the supported RPCs from clustered routing table
156 private void unregisterSupportedRpcs(){
157 Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
158 //TODO: remove all routed RPCs as well
159 for (QName rpc : currentlySupported) {
160 listener.onRpcImplementationRemoved(rpc);
165 * Un-Register all the locally supported Routed RPCs from clustered routing table
167 private void unregisterSupportedRoutedRpcs(){
169 //TODO: remove all routed RPCs as well
173 private RoutingTable<RpcRouter.RouteIdentifier, String> getRoutingTable(){
174 Optional<RoutingTable<RpcRouter.RouteIdentifier, String>> routingTable =
175 routingTableProvider.getRoutingTable();
177 checkNotNull(routingTable.isPresent(), "Routing table is null");
179 return routingTable.get();
183 * Listener for rpc registrations in broker
185 private class RpcListener implements RpcRegistrationListener {
188 public void onRpcImplementationAdded(QName rpc) {
190 _logger.debug("Adding registration for [{}]", rpc);
191 RouteIdentifierImpl routeId = new RouteIdentifierImpl();
192 routeId.setType(rpc);
194 RoutingTable<RpcRouter.RouteIdentifier, String> routingTable = getRoutingTable();
197 routingTable.addGlobalRoute(routeId, server.getServerAddress());
198 _logger.debug("Route added [{}-{}]", routeId, server.getServerAddress());
200 } catch (RoutingTableException | SystemException e) {
201 //TODO: This can be thrown when route already exists in the table. Broker
202 //needs to handle this.
203 _logger.error("Unhandled exception while adding global route to routing table [{}]", e);
209 public void onRpcImplementationRemoved(QName rpc) {
211 _logger.debug("Removing registration for [{}]", rpc);
212 RouteIdentifierImpl routeId = new RouteIdentifierImpl();
213 routeId.setType(rpc);
215 RoutingTable<RpcRouter.RouteIdentifier, String> routingTable = getRoutingTable();
218 routingTable.removeGlobalRoute(routeId);
219 } catch (RoutingTableException | SystemException e) {
220 _logger.error("Route delete failed {}", e);
226 * Listener for Routed Rpc registrations in broker
228 private class RoutedRpcListener
229 implements RouteChangeListener<RpcRoutingContext, InstanceIdentifier> {
236 public void onRouteChange(RouteChange<RpcRoutingContext, InstanceIdentifier> routeChange) {
237 Map<RpcRoutingContext, Set<InstanceIdentifier>> announcements = routeChange.getAnnouncements();
238 announce(getRouteIdentifiers(announcements));
240 Map<RpcRoutingContext, Set<InstanceIdentifier>> removals = routeChange.getRemovals();
241 remove(getRouteIdentifiers(removals));
246 * @param announcements
248 private void announce(Set<RpcRouter.RouteIdentifier> announcements) {
249 _logger.debug("Announcing [{}]", announcements);
250 RoutingTable<RpcRouter.RouteIdentifier, String> routingTable = getRoutingTable();
252 routingTable.addRoutes(announcements, server.getServerAddress());
253 } catch (RoutingTableException | SystemException e) {
254 _logger.error("Route announcement failed {}", e);
262 private void remove(Set<RpcRouter.RouteIdentifier> removals){
263 _logger.debug("Removing [{}]", removals);
264 RoutingTable<RpcRouter.RouteIdentifier, String> routingTable = getRoutingTable();
266 routingTable.removeRoutes(removals, server.getServerAddress());
267 } catch (RoutingTableException | SystemException e) {
268 _logger.error("Route removal failed {}", e);
277 private Set<RpcRouter.RouteIdentifier> getRouteIdentifiers(Map<RpcRoutingContext, Set<InstanceIdentifier>> changes) {
278 RouteIdentifierImpl routeId = null;
279 Set<RpcRouter.RouteIdentifier> routeIdSet = new HashSet<RpcRouter.RouteIdentifier>();
281 for (RpcRoutingContext context : changes.keySet()){
282 routeId = new RouteIdentifierImpl();
283 routeId.setType(context.getRpc());
284 //routeId.setContext(context.getContext());
286 for (InstanceIdentifier instanceId : changes.get(context)){
287 routeId.setRoute(instanceId);
288 routeIdSet.add(routeId);