2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.sal.connector.remoterpc;
11 import com.google.common.base.Optional;
12 import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
13 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
14 import org.opendaylight.controller.sal.connector.api.RpcRouter;
15 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
16 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
17 import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
18 import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
19 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
20 import org.opendaylight.controller.sal.core.api.Provider;
21 import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation;
22 import org.opendaylight.controller.sal.core.api.RpcImplementation;
23 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
24 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
25 import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
26 import org.opendaylight.yangtools.yang.common.QName;
27 import org.opendaylight.yangtools.yang.common.RpcResult;
28 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
29 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
30 import org.osgi.framework.BundleContext;
31 import org.osgi.util.tracker.ServiceTracker;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
35 import java.util.Collection;
36 import java.util.Collections;
37 import java.util.HashSet;
41 import static com.google.common.base.Preconditions.checkNotNull;
43 public class RemoteRpcProvider implements
45 RoutedRpcDefaultImplementation,
49 private Logger _logger = LoggerFactory.getLogger(RemoteRpcProvider.class);
51 private final ServerImpl server;
52 private final ClientImpl client;
53 private RoutingTableProvider routingTableProvider;
54 private final RpcListener listener = new RpcListener();
55 private final RoutedRpcListener routeChangeListener = new RoutedRpcListener();
56 private ProviderSession brokerSession;
57 private RpcProvisionRegistry rpcProvisionRegistry;
58 private BundleContext context;
59 private ServiceTracker clusterTracker;
61 public RemoteRpcProvider(ServerImpl server, ClientImpl client) {
66 public void setRoutingTableProvider(RoutingTableProvider provider) {
67 this.routingTableProvider = provider;
68 client.setRoutingTableProvider(provider);
71 public void setContext(BundleContext context){
72 this.context = context;
75 public void setRpcProvisionRegistry(RpcProvisionRegistry rpcProvisionRegistry){
76 this.rpcProvisionRegistry = rpcProvisionRegistry;
80 public void onSessionInitiated(ProviderSession session) {
81 brokerSession = session;
82 server.setBrokerSession(session);
87 public Set<QName> getSupportedRpcs() {
88 //TODO: Ask Tony if we need to get this from routing table
89 return Collections.emptySet();
93 public Collection<ProviderFunctionality> getProviderFunctionality() {
94 // TODO Auto-generated method stub
99 public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
100 return client.invokeRpc(rpc, input);
104 public RpcResult<CompositeNode> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
105 return client.invokeRpc(rpc, identifier, input);
108 public void start() {
111 brokerSession.addRpcRegistrationListener(listener);
112 rpcProvisionRegistry.setRoutedRpcDefaultDelegate(this);
113 rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
115 announceSupportedRpcs();
116 announceSupportedRoutedRpcs();
120 public void close() throws Exception {
121 unregisterSupportedRpcs();
122 unregisterSupportedRoutedRpcs();
133 * Add all the locally registered RPCs in the clustered routing table
135 private void announceSupportedRpcs(){
136 Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
137 for (QName rpc : currentlySupported) {
138 listener.onRpcImplementationAdded(rpc);
143 * Add all the locally registered Routed RPCs in the clustered routing table
145 private void announceSupportedRoutedRpcs(){
147 //TODO: announce all routed RPCs as well
152 * Un-Register all the supported RPCs from clustered routing table
154 private void unregisterSupportedRpcs(){
155 Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
156 //TODO: remove all routed RPCs as well
157 for (QName rpc : currentlySupported) {
158 listener.onRpcImplementationRemoved(rpc);
163 * Un-Register all the locally supported Routed RPCs from clustered routing table
165 private void unregisterSupportedRoutedRpcs(){
167 //TODO: remove all routed RPCs as well
171 private RoutingTable<RpcRouter.RouteIdentifier, String> getRoutingTable(){
172 Optional<RoutingTable<RpcRouter.RouteIdentifier, String>> routingTable =
173 routingTableProvider.getRoutingTable();
175 checkNotNull(routingTable.isPresent(), "Routing table is null");
177 return routingTable.get();
181 * Listener for rpc registrations in broker
183 private class RpcListener implements RpcRegistrationListener {
186 public void onRpcImplementationAdded(QName rpc) {
188 _logger.debug("Adding registration for [{}]", rpc);
189 RouteIdentifierImpl routeId = new RouteIdentifierImpl();
190 routeId.setType(rpc);
192 RoutingTable<RpcRouter.RouteIdentifier, String> routingTable = getRoutingTable();
195 routingTable.addGlobalRoute(routeId, server.getServerAddress());
196 _logger.debug("Route added [{}-{}]", routeId, server.getServerAddress());
198 } catch (RoutingTableException | SystemException e) {
199 //TODO: This can be thrown when route already exists in the table. Broker
200 //needs to handle this.
201 _logger.error("Unhandled exception while adding global route to routing table [{}]", e);
207 public void onRpcImplementationRemoved(QName rpc) {
209 _logger.debug("Removing registration for [{}]", rpc);
210 RouteIdentifierImpl routeId = new RouteIdentifierImpl();
211 routeId.setType(rpc);
213 RoutingTable<RpcRouter.RouteIdentifier, String> routingTable = getRoutingTable();
216 routingTable.removeGlobalRoute(routeId);
217 } catch (RoutingTableException | SystemException e) {
218 _logger.error("Route delete failed {}", e);
224 * Listener for Routed Rpc registrations in broker
226 private class RoutedRpcListener
227 implements RouteChangeListener<RpcRoutingContext, InstanceIdentifier> {
234 public void onRouteChange(RouteChange<RpcRoutingContext, InstanceIdentifier> routeChange) {
235 Map<RpcRoutingContext, Set<InstanceIdentifier>> announcements = routeChange.getAnnouncements();
236 announce(getRouteIdentifiers(announcements));
238 Map<RpcRoutingContext, Set<InstanceIdentifier>> removals = routeChange.getRemovals();
239 remove(getRouteIdentifiers(removals));
244 * @param announcements
246 private void announce(Set<RpcRouter.RouteIdentifier> announcements) {
247 _logger.debug("Announcing [{}]", announcements);
248 RoutingTable<RpcRouter.RouteIdentifier, String> routingTable = getRoutingTable();
250 routingTable.addRoutes(announcements, server.getServerAddress());
251 } catch (RoutingTableException | SystemException e) {
252 _logger.error("Route announcement failed {}", e);
260 private void remove(Set<RpcRouter.RouteIdentifier> removals){
261 _logger.debug("Removing [{}]", removals);
262 RoutingTable<RpcRouter.RouteIdentifier, String> routingTable = getRoutingTable();
264 routingTable.removeRoutes(removals, server.getServerAddress());
265 } catch (RoutingTableException | SystemException e) {
266 _logger.error("Route removal failed {}", e);
275 private Set<RpcRouter.RouteIdentifier> getRouteIdentifiers(Map<RpcRoutingContext, Set<InstanceIdentifier>> changes) {
276 RouteIdentifierImpl routeId = null;
277 Set<RpcRouter.RouteIdentifier> routeIdSet = new HashSet<RpcRouter.RouteIdentifier>();
279 for (RpcRoutingContext context : changes.keySet()){
280 routeId = new RouteIdentifierImpl();
281 routeId.setType(context.getRpc());
282 //routeId.setContext(context.getContext());
284 for (InstanceIdentifier instanceId : changes.get(context)){
285 routeId.setRoute(instanceId);
286 routeIdSet.add(routeId);