/* * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.sal.connector.remoterpc; import static com.google.common.base.Preconditions.checkNotNull; import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Map; import java.util.Set; import org.opendaylight.controller.md.sal.common.api.routing.RouteChange; import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener; import org.opendaylight.controller.sal.connector.api.RpcRouter; import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable; import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException; import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException; import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl; import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; import org.opendaylight.controller.sal.core.api.Provider; import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation; import org.opendaylight.controller.sal.core.api.RpcImplementation; import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry; import org.opendaylight.controller.sal.core.api.RpcRegistrationListener; import org.opendaylight.controller.sal.core.api.RpcRoutingContext; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.CompositeNode; import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; import org.osgi.framework.BundleContext; import org.osgi.util.tracker.ServiceTracker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Optional; import com.google.common.util.concurrent.ListenableFuture; public class RemoteRpcProvider implements RpcImplementation, RoutedRpcDefaultImplementation, AutoCloseable, Provider { private final Logger _logger = LoggerFactory.getLogger(RemoteRpcProvider.class); private final ServerImpl server; private final ClientImpl client; private RoutingTableProvider routingTableProvider; private final RpcListener listener = new RpcListener(); private final RoutedRpcListener routeChangeListener = new RoutedRpcListener(); private ProviderSession brokerSession; private RpcProvisionRegistry rpcProvisionRegistry; private BundleContext context; private ServiceTracker clusterTracker; public RemoteRpcProvider(ServerImpl server, ClientImpl client) { this.server = server; this.client = client; } public void setRoutingTableProvider(RoutingTableProvider provider) { this.routingTableProvider = provider; client.setRoutingTableProvider(provider); } public void setContext(BundleContext context){ this.context = context; } public void setRpcProvisionRegistry(RpcProvisionRegistry rpcProvisionRegistry){ this.rpcProvisionRegistry = rpcProvisionRegistry; } @Override public void onSessionInitiated(ProviderSession session) { brokerSession = session; server.setBrokerSession(session); start(); } @Override public Set getSupportedRpcs() { //TODO: Ask Tony if we need to get this from routing table return Collections.emptySet(); } @Override public Collection getProviderFunctionality() { // TODO Auto-generated method stub return null; } @Override public ListenableFuture> invokeRpc(QName rpc, CompositeNode input) { return client.invokeRpc(rpc, input); } @Override public ListenableFuture> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) { return client.invokeRpc(rpc, identifier, input); } public void start() { server.start(); client.start(); brokerSession.addRpcRegistrationListener(listener); rpcProvisionRegistry.setRoutedRpcDefaultDelegate(this); rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener); announceSupportedRpcs(); announceSupportedRoutedRpcs(); } @Override public void close() throws Exception { unregisterSupportedRpcs(); unregisterSupportedRoutedRpcs(); server.close(); client.close(); } public void stop() { server.stop(); client.stop(); } /** * Add all the locally registered RPCs in the clustered routing table */ private void announceSupportedRpcs(){ Set currentlySupported = brokerSession.getSupportedRpcs(); for (QName rpc : currentlySupported) { listener.onRpcImplementationAdded(rpc); } } /** * Add all the locally registered Routed RPCs in the clustered routing table */ private void announceSupportedRoutedRpcs(){ //TODO: announce all routed RPCs as well } /** * Un-Register all the supported RPCs from clustered routing table */ private void unregisterSupportedRpcs(){ Set currentlySupported = brokerSession.getSupportedRpcs(); //TODO: remove all routed RPCs as well for (QName rpc : currentlySupported) { listener.onRpcImplementationRemoved(rpc); } } /** * Un-Register all the locally supported Routed RPCs from clustered routing table */ private void unregisterSupportedRoutedRpcs(){ //TODO: remove all routed RPCs as well } private RoutingTable getRoutingTable(){ Optional> routingTable = routingTableProvider.getRoutingTable(); checkNotNull(routingTable.isPresent(), "Routing table is null"); return routingTable.get(); } /** * Listener for rpc registrations in broker */ private class RpcListener implements RpcRegistrationListener { @Override public void onRpcImplementationAdded(QName rpc) { _logger.debug("Adding registration for [{}]", rpc); RouteIdentifierImpl routeId = new RouteIdentifierImpl(); routeId.setType(rpc); RoutingTable routingTable = getRoutingTable(); try { routingTable.addGlobalRoute(routeId, server.getServerAddress()); _logger.debug("Route added [{}-{}]", routeId, server.getServerAddress()); } catch (RoutingTableException | SystemException e) { //TODO: This can be thrown when route already exists in the table. Broker //needs to handle this. _logger.error("Unhandled exception while adding global route to routing table [{}]", e); } } @Override public void onRpcImplementationRemoved(QName rpc) { _logger.debug("Removing registration for [{}]", rpc); RouteIdentifierImpl routeId = new RouteIdentifierImpl(); routeId.setType(rpc); RoutingTable routingTable = getRoutingTable(); try { routingTable.removeGlobalRoute(routeId); } catch (RoutingTableException | SystemException e) { _logger.error("Route delete failed {}", e); } } } /** * Listener for Routed Rpc registrations in broker */ private class RoutedRpcListener implements RouteChangeListener { /** * * @param routeChange */ @Override public void onRouteChange(RouteChange routeChange) { Map> announcements = routeChange.getAnnouncements(); announce(getRouteIdentifiers(announcements)); Map> removals = routeChange.getRemovals(); remove(getRouteIdentifiers(removals)); } /** * * @param announcements */ private void announce(Set announcements) { _logger.debug("Announcing [{}]", announcements); RoutingTable routingTable = getRoutingTable(); try { routingTable.addRoutes(announcements, server.getServerAddress()); } catch (RoutingTableException | SystemException e) { _logger.error("Route announcement failed {}", e); } } /** * * @param removals */ private void remove(Set removals){ _logger.debug("Removing [{}]", removals); RoutingTable routingTable = getRoutingTable(); try { routingTable.removeRoutes(removals, server.getServerAddress()); } catch (RoutingTableException | SystemException e) { _logger.error("Route removal failed {}", e); } } /** * * @param changes * @return */ private Set getRouteIdentifiers(Map> changes) { RouteIdentifierImpl routeId = null; Set routeIdSet = new HashSet(); for (RpcRoutingContext context : changes.keySet()){ routeId = new RouteIdentifierImpl(); routeId.setType(context.getRpc()); //routeId.setContext(context.getContext()); for (InstanceIdentifier instanceId : changes.get(context)){ routeId.setRoute(instanceId); routeIdSet.add(routeId); } } return routeIdSet; } } }