X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fimplementation%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fconnector%2Fremoterpc%2FRemoteRpcProvider.java;h=76df25682e1b79df3f7762ac1fb8094dac5eda18;hp=bf205fc38d54a57ebc2ac351a3436f1d0215dbd9;hb=b6e3e11ddcea90a2ae7f93c179625941e8e22ccd;hpb=2af7af1c16a44873ff474ad8429e41f762c54e73 diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcProvider.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcProvider.java index bf205fc38d..76df25682e 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcProvider.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcProvider.java @@ -8,86 +8,288 @@ package org.opendaylight.controller.sal.connector.remoterpc; +import static com.google.common.base.Preconditions.checkState; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.opendaylight.controller.md.sal.common.api.routing.RouteChange; +import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener; +import org.opendaylight.controller.sal.connector.api.RpcRouter; +import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable; +import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException; +import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException; +import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl; import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; import org.opendaylight.controller.sal.core.api.Provider; +import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation; +import org.opendaylight.controller.sal.core.api.RpcImplementation; +import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry; +import org.opendaylight.controller.sal.core.api.RpcRegistrationListener; +import org.opendaylight.controller.sal.core.api.RpcRoutingContext; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; +import org.osgi.framework.BundleContext; +import org.osgi.util.tracker.ServiceTracker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.util.Collection; -import java.util.Set; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.ListenableFuture; -public class RemoteRpcProvider implements - RemoteRpcServer, - RemoteRpcClient, +public class RemoteRpcProvider implements + RpcImplementation, + RoutedRpcDefaultImplementation, + AutoCloseable, Provider { - private final ServerImpl server; - private final ClientImpl client; - private RoutingTableProvider provider; + private final Logger _logger = LoggerFactory.getLogger(RemoteRpcProvider.class); - @Override - public void setRoutingTableProvider(RoutingTableProvider provider) { - this.provider = provider; - server.setRoutingTableProvider(provider); - client.setRoutingTableProvider(provider); + private final ServerImpl server; + private final ClientImpl client; + private RoutingTableProvider routingTableProvider; + private final RpcListener listener = new RpcListener(); + private final RoutedRpcListener routeChangeListener = new RoutedRpcListener(); + private ProviderSession brokerSession; + private RpcProvisionRegistry rpcProvisionRegistry; + private BundleContext context; + private ServiceTracker clusterTracker; + + public RemoteRpcProvider(ServerImpl server, ClientImpl client) { + this.server = server; + this.client = client; + } + + public void setRoutingTableProvider(RoutingTableProvider provider) { + this.routingTableProvider = provider; + client.setRoutingTableProvider(provider); + } + + public void setContext(BundleContext context){ + this.context = context; + } + + public void setRpcProvisionRegistry(RpcProvisionRegistry rpcProvisionRegistry){ + this.rpcProvisionRegistry = rpcProvisionRegistry; + } + + @Override + public void onSessionInitiated(ProviderSession session) { + brokerSession = session; + server.setBrokerSession(session); + start(); + } + + @Override + public Set getSupportedRpcs() { + //TODO: Ask Tony if we need to get this from routing table + return Collections.emptySet(); + } + + @Override + public Collection getProviderFunctionality() { + // TODO Auto-generated method stub + return null; + } + + @Override + public ListenableFuture> invokeRpc(QName rpc, CompositeNode input) { + return client.invokeRpc(rpc, input); + } + + @Override + public ListenableFuture> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) { + return client.invokeRpc(rpc, identifier, input); + } + + public void start() { + server.start(); + client.start(); + brokerSession.addRpcRegistrationListener(listener); + rpcProvisionRegistry.setRoutedRpcDefaultDelegate(this); + rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener); + + announceSupportedRpcs(); + announceSupportedRoutedRpcs(); + } + + @Override + public void close() throws Exception { + unregisterSupportedRpcs(); + unregisterSupportedRoutedRpcs(); + server.close(); + client.close(); + } + + public void stop() { + server.stop(); + client.stop(); + } + + /** + * Add all the locally registered RPCs in the clustered routing table + */ + private void announceSupportedRpcs(){ + Set currentlySupported = brokerSession.getSupportedRpcs(); + for (QName rpc : currentlySupported) { + listener.onRpcImplementationAdded(rpc); } - - @Override - public RpcResult invokeRpc(QName rpc, CompositeNode input) { - return client.invokeRpc(rpc, input); + } + + /** + * Add all the locally registered Routed RPCs in the clustered routing table + */ + private void announceSupportedRoutedRpcs(){ + + //TODO: announce all routed RPCs as well + + } + + /** + * Un-Register all the supported RPCs from clustered routing table + */ + private void unregisterSupportedRpcs(){ + Set currentlySupported = brokerSession.getSupportedRpcs(); + //TODO: remove all routed RPCs as well + for (QName rpc : currentlySupported) { + listener.onRpcImplementationRemoved(rpc); } - + } + + /** + * Un-Register all the locally supported Routed RPCs from clustered routing table + */ + private void unregisterSupportedRoutedRpcs(){ + + //TODO: remove all routed RPCs as well + + } + + private RoutingTable, String> getRoutingTable(){ + Optional, String>> routingTable = + routingTableProvider.getRoutingTable(); + + checkState(routingTable.isPresent(), "Routing table is null"); + + return routingTable.get(); + } + + /** + * Listener for rpc registrations in broker + */ + private class RpcListener implements RpcRegistrationListener { + @Override - public Set getSupportedRpcs() { - return client.getSupportedRpcs(); - } - - - public RemoteRpcProvider(ServerImpl server, ClientImpl client) { - this.server = server; - this.client = client; - } - - public void setBrokerSession(ProviderSession session) { - server.setBrokerSession(session); - } -// public void setServerPool(ExecutorService serverPool) { -// server.setServerPool(serverPool); -// } - public void start() { - //when listener was being invoked and addRPCImplementation was being - //called the client was null. - server.setClient(client); - server.start(); - client.start(); + public void onRpcImplementationAdded(QName rpc) { + + _logger.debug("Adding registration for [{}]", rpc); + RouteIdentifierImpl routeId = new RouteIdentifierImpl(); + routeId.setType(rpc); + + RoutingTable, String> routingTable = getRoutingTable(); + + try { + routingTable.addGlobalRoute(routeId, server.getServerAddress()); + _logger.debug("Route added [{}-{}]", routeId, server.getServerAddress()); + } catch (RoutingTableException | SystemException e) { + //TODO: This can be thrown when route already exists in the table. Broker + //needs to handle this. + _logger.error("Unhandled exception while adding global route to routing table [{}]", e); + } } - @Override - public Collection getProviderFunctionality() { - // TODO Auto-generated method stub - return null; + public void onRpcImplementationRemoved(QName rpc) { + + _logger.debug("Removing registration for [{}]", rpc); + RouteIdentifierImpl routeId = new RouteIdentifierImpl(); + routeId.setType(rpc); + + RoutingTable, String> routingTable = getRoutingTable(); + + try { + routingTable.removeGlobalRoute(routeId); + } catch (RoutingTableException | SystemException e) { + _logger.error("Route delete failed {}", e); + } } - - + } + + /** + * Listener for Routed Rpc registrations in broker + */ + private class RoutedRpcListener + implements RouteChangeListener { + + /** + * + * @param routeChange + */ @Override - public void onSessionInitiated(ProviderSession session) { - server.setBrokerSession(session); - start(); + public void onRouteChange(RouteChange routeChange) { + Map> announcements = routeChange.getAnnouncements(); + announce(getRouteIdentifiers(announcements)); + + Map> removals = routeChange.getRemovals(); + remove(getRouteIdentifiers(removals)); } - - - public void close() throws Exception { - server.close(); - client.close(); + + /** + * + * @param announcements + */ + private void announce(Set> announcements) { + _logger.debug("Announcing [{}]", announcements); + RoutingTable, String> routingTable = getRoutingTable(); + try { + routingTable.addRoutes(announcements, server.getServerAddress()); + } catch (RoutingTableException | SystemException e) { + _logger.error("Route announcement failed {}", e); + } } - @Override - public void stop() { - server.stop(); - client.stop(); + /** + * + * @param removals + */ + private void remove(Set> removals){ + _logger.debug("Removing [{}]", removals); + RoutingTable, String> routingTable = getRoutingTable(); + try { + routingTable.removeRoutes(removals, server.getServerAddress()); + } catch (RoutingTableException | SystemException e) { + _logger.error("Route removal failed {}", e); + } + } + + /** + * + * @param changes + * @return + */ + private Set> getRouteIdentifiers(Map> changes) { + RouteIdentifierImpl routeId = null; + Set> routeIdSet = new HashSet<>(); + + for (RpcRoutingContext context : changes.keySet()){ + routeId = new RouteIdentifierImpl(); + routeId.setType(context.getRpc()); + //routeId.setContext(context.getContext()); + + for (InstanceIdentifier instanceId : changes.get(context)){ + routeId.setRoute(instanceId); + routeIdSet.add(routeId); + } + } + return routeIdSet; } + + } }