package org.opendaylight.controller.sal.connector.remoterpc;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
+import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
import org.opendaylight.controller.sal.core.api.Provider;
+import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
+import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
+import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.osgi.framework.BundleContext;
+import org.osgi.util.tracker.ServiceTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.util.Collection;
-import java.util.Set;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ListenableFuture;
-public class RemoteRpcProvider implements
- RemoteRpcServer,
- RemoteRpcClient,
+public class RemoteRpcProvider implements
+ RpcImplementation,
+ RoutedRpcDefaultImplementation,
+ AutoCloseable,
Provider {
- private final ServerImpl server;
- private final ClientImpl client;
- private RoutingTableProvider provider;
+ private final Logger _logger = LoggerFactory.getLogger(RemoteRpcProvider.class);
- @Override
- public void setRoutingTableProvider(RoutingTableProvider provider) {
- this.provider = provider;
- server.setRoutingTableProvider(provider);
- client.setRoutingTableProvider(provider);
+ private final ServerImpl server;
+ private final ClientImpl client;
+ private RoutingTableProvider routingTableProvider;
+ private final RpcListener listener = new RpcListener();
+ private final RoutedRpcListener routeChangeListener = new RoutedRpcListener();
+ private ProviderSession brokerSession;
+ private RpcProvisionRegistry rpcProvisionRegistry;
+ private BundleContext context;
+ private ServiceTracker<?, ?> clusterTracker;
+
+ public RemoteRpcProvider(ServerImpl server, ClientImpl client) {
+ this.server = server;
+ this.client = client;
+ }
+
+ public void setRoutingTableProvider(RoutingTableProvider provider) {
+ this.routingTableProvider = provider;
+ client.setRoutingTableProvider(provider);
+ }
+
+ public void setContext(BundleContext context){
+ this.context = context;
+ }
+
+ public void setRpcProvisionRegistry(RpcProvisionRegistry rpcProvisionRegistry){
+ this.rpcProvisionRegistry = rpcProvisionRegistry;
+ }
+
+ @Override
+ public void onSessionInitiated(ProviderSession session) {
+ brokerSession = session;
+ server.setBrokerSession(session);
+ start();
+ }
+
+ @Override
+ public Set<QName> getSupportedRpcs() {
+ //TODO: Ask Tony if we need to get this from routing table
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Collection<ProviderFunctionality> getProviderFunctionality() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
+ return client.invokeRpc(rpc, input);
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
+ return client.invokeRpc(rpc, identifier, input);
+ }
+
+ public void start() {
+ server.start();
+ client.start();
+ brokerSession.addRpcRegistrationListener(listener);
+ rpcProvisionRegistry.setRoutedRpcDefaultDelegate(this);
+ rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
+
+ announceSupportedRpcs();
+ announceSupportedRoutedRpcs();
+ }
+
+ @Override
+ public void close() throws Exception {
+ unregisterSupportedRpcs();
+ unregisterSupportedRoutedRpcs();
+ server.close();
+ client.close();
+ }
+
+ public void stop() {
+ server.stop();
+ client.stop();
+ }
+
+ /**
+ * Add all the locally registered RPCs in the clustered routing table
+ */
+ private void announceSupportedRpcs(){
+ Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
+ for (QName rpc : currentlySupported) {
+ listener.onRpcImplementationAdded(rpc);
}
-
- @Override
- public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
- return client.invokeRpc(rpc, input);
+ }
+
+ /**
+ * Add all the locally registered Routed RPCs in the clustered routing table
+ */
+ private void announceSupportedRoutedRpcs(){
+
+ //TODO: announce all routed RPCs as well
+
+ }
+
+ /**
+ * Un-Register all the supported RPCs from clustered routing table
+ */
+ private void unregisterSupportedRpcs(){
+ Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
+ //TODO: remove all routed RPCs as well
+ for (QName rpc : currentlySupported) {
+ listener.onRpcImplementationRemoved(rpc);
}
-
+ }
+
+ /**
+ * Un-Register all the locally supported Routed RPCs from clustered routing table
+ */
+ private void unregisterSupportedRoutedRpcs(){
+
+ //TODO: remove all routed RPCs as well
+
+ }
+
+ private RoutingTable<RpcRouter.RouteIdentifier<?, ?, ?>, String> getRoutingTable(){
+ Optional<RoutingTable<RpcRouter.RouteIdentifier<?, ?, ?>, String>> routingTable =
+ routingTableProvider.getRoutingTable();
+
+ checkState(routingTable.isPresent(), "Routing table is null");
+
+ return routingTable.get();
+ }
+
+ /**
+ * Listener for rpc registrations in broker
+ */
+ private class RpcListener implements RpcRegistrationListener {
+
@Override
- public Set<QName> getSupportedRpcs() {
- return client.getSupportedRpcs();
- }
-
-
- public RemoteRpcProvider(ServerImpl server, ClientImpl client) {
- this.server = server;
- this.client = client;
- }
-
- public void setBrokerSession(ProviderSession session) {
- server.setBrokerSession(session);
- }
-// public void setServerPool(ExecutorService serverPool) {
-// server.setServerPool(serverPool);
-// }
- public void start() {
- //when listener was being invoked and addRPCImplementation was being
- //called the client was null.
- server.setClient(client);
- server.start();
- client.start();
+ public void onRpcImplementationAdded(QName rpc) {
+
+ _logger.debug("Adding registration for [{}]", rpc);
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl();
+ routeId.setType(rpc);
+
+ RoutingTable<RpcRouter.RouteIdentifier<?, ?, ?>, String> routingTable = getRoutingTable();
+
+ try {
+ routingTable.addGlobalRoute(routeId, server.getServerAddress());
+ _logger.debug("Route added [{}-{}]", routeId, server.getServerAddress());
+ } catch (RoutingTableException | SystemException e) {
+ //TODO: This can be thrown when route already exists in the table. Broker
+ //needs to handle this.
+ _logger.error("Unhandled exception while adding global route to routing table [{}]", e);
+ }
}
-
@Override
- public Collection<ProviderFunctionality> getProviderFunctionality() {
- // TODO Auto-generated method stub
- return null;
+ public void onRpcImplementationRemoved(QName rpc) {
+
+ _logger.debug("Removing registration for [{}]", rpc);
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl();
+ routeId.setType(rpc);
+
+ RoutingTable<RpcRouter.RouteIdentifier<?, ?, ?>, String> routingTable = getRoutingTable();
+
+ try {
+ routingTable.removeGlobalRoute(routeId);
+ } catch (RoutingTableException | SystemException e) {
+ _logger.error("Route delete failed {}", e);
+ }
}
-
-
+ }
+
+ /**
+ * Listener for Routed Rpc registrations in broker
+ */
+ private class RoutedRpcListener
+ implements RouteChangeListener<RpcRoutingContext, InstanceIdentifier> {
+
+ /**
+ *
+ * @param routeChange
+ */
@Override
- public void onSessionInitiated(ProviderSession session) {
- server.setBrokerSession(session);
- start();
+ public void onRouteChange(RouteChange<RpcRoutingContext, InstanceIdentifier> routeChange) {
+ Map<RpcRoutingContext, Set<InstanceIdentifier>> announcements = routeChange.getAnnouncements();
+ announce(getRouteIdentifiers(announcements));
+
+ Map<RpcRoutingContext, Set<InstanceIdentifier>> removals = routeChange.getRemovals();
+ remove(getRouteIdentifiers(removals));
}
-
-
- public void close() throws Exception {
- server.close();
- client.close();
+
+ /**
+ *
+ * @param announcements
+ */
+ private void announce(Set<RpcRouter.RouteIdentifier<?, ?, ?>> announcements) {
+ _logger.debug("Announcing [{}]", announcements);
+ RoutingTable<RpcRouter.RouteIdentifier<?, ?, ?>, String> routingTable = getRoutingTable();
+ try {
+ routingTable.addRoutes(announcements, server.getServerAddress());
+ } catch (RoutingTableException | SystemException e) {
+ _logger.error("Route announcement failed {}", e);
+ }
}
- @Override
- public void stop() {
- server.stop();
- client.stop();
+ /**
+ *
+ * @param removals
+ */
+ private void remove(Set<RpcRouter.RouteIdentifier<?, ?, ?>> removals){
+ _logger.debug("Removing [{}]", removals);
+ RoutingTable<RpcRouter.RouteIdentifier<?, ?, ?>, String> routingTable = getRoutingTable();
+ try {
+ routingTable.removeRoutes(removals, server.getServerAddress());
+ } catch (RoutingTableException | SystemException e) {
+ _logger.error("Route removal failed {}", e);
+ }
+ }
+
+ /**
+ *
+ * @param changes
+ * @return
+ */
+ private Set<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers(Map<RpcRoutingContext, Set<InstanceIdentifier>> changes) {
+ RouteIdentifierImpl routeId = null;
+ Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdSet = new HashSet<>();
+
+ for (RpcRoutingContext context : changes.keySet()){
+ routeId = new RouteIdentifierImpl();
+ routeId.setType(context.getRpc());
+ //routeId.setContext(context.getContext());
+
+ for (InstanceIdentifier instanceId : changes.get(context)){
+ routeId.setRoute(instanceId);
+ routeIdSet.add(routeId);
+ }
+ }
+ return routeIdSet;
}
+
+ }
}