X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fimplementation%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fconnector%2Fremoterpc%2FServerImpl.java;h=b5a67ff0df97f3d110f1842074617468ce836b61;hb=607df672b92e06f36723397481408a257c98564e;hp=83b93858cff392c7ab02e2b9cc0499c45b5798dc;hpb=de12565a7795af98788f8150eb0072f9c985f4a1;p=controller.git diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerImpl.java index 83b93858cf..b5a67ff0df 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerImpl.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerImpl.java @@ -8,278 +8,422 @@ package org.opendaylight.controller.sal.connector.remoterpc; import com.google.common.base.Optional; - +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.Sets; +import org.opendaylight.controller.md.sal.common.api.routing.RouteChange; import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener; import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable; import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException; import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException; -import org.opendaylight.controller.sal.connector.remoterpc.dto.Message; -import org.opendaylight.controller.sal.connector.remoterpc.dto.Message.MessageType; import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl; -import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils; import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; -import org.opendaylight.controller.sal.core.api.RpcImplementation; import org.opendaylight.controller.sal.core.api.RpcRegistrationListener; +import org.opendaylight.controller.sal.core.api.RpcRoutingContext; import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.common.RpcResult; -import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.zeromq.ZMQ; -import java.io.IOException; +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.Enumeration; import java.util.HashSet; +import java.util.Map; import java.util.Set; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; + +import static com.google.common.base.Preconditions.checkNotNull; /** - * ZeroMq based implementation of RpcRouter TODO: 1. Make rpc request handling - * async and non-blocking. Note zmq socket is not thread safe 2. Read properties - * from config file using existing(?) ODL properties framework + * ZeroMq based implementation of RpcRouter. It implements RouteChangeListener of RoutingTable + * so that it gets route change notifications from routing table. */ -public class ServerImpl implements RemoteRpcServer, RouteChangeListener { +public class ServerImpl implements RemoteRpcServer, RouteChangeListener { - private Logger _logger = LoggerFactory.getLogger(ServerImpl.class); + private Logger _logger = LoggerFactory.getLogger(ServerImpl.class); - private ExecutorService serverPool; + private ExecutorService serverPool; + protected ServerRequestHandler handler; - // private RoutingTable routingTable; - private RoutingTableProvider routingTable; - private Set remoteServices; - private ProviderSession brokerSession; - private ZMQ.Context context; - private ZMQ.Socket replySocket; + private Set remoteServices; + private ProviderSession brokerSession; + private ZMQ.Context context; - private final RpcListener listener = new RpcListener(); + private final RpcListener listener = new RpcListener(); - private final String localUri = Context.getInstance().getLocalUri(); + private final String HANDLER_INPROC_ADDRESS = "inproc://rpc-request-handler"; + private final int HANDLER_WORKER_COUNT = 2; + private final int HWM = 200;//high water mark on sockets + private volatile State status = State.STOPPED; - private final int rpcPort; + private String serverAddress; + private int port; - private RpcImplementation client; + private ClientImpl client; - public RpcImplementation getClient() { - return client; - } + private RoutingTableProvider routingTableProvider; - public void setClient(RpcImplementation client) { - this.client = client; - } + public static enum State { + STARTING, STARTED, STOPPED; + } - // Prevent instantiation - public ServerImpl(int rpcPort) { - this.rpcPort = rpcPort; - } + public ServerImpl(int port) { + this.port = port; + this.serverAddress = new StringBuilder(findIpAddress()). + append(":"). + append(port). + toString(); + } - public void setBrokerSession(ProviderSession session) { - this.brokerSession = session; - } + public RoutingTableProvider getRoutingTableProvider() { + return routingTableProvider; + } - public ExecutorService getServerPool() { - return serverPool; - } + public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) { + this.routingTableProvider = routingTableProvider; + } - public void setServerPool(ExecutorService serverPool) { - this.serverPool = serverPool; - } + public ClientImpl getClient(){ + return this.client; + } - public void start() { - context = ZMQ.context(1); - serverPool = Executors.newSingleThreadExecutor(); - remoteServices = new HashSet(); + public void setClient(ClientImpl client) { + this.client = client; + } - // Start listening rpc requests - serverPool.execute(receive()); + public State getStatus() { + return this.status; + } - brokerSession.addRpcRegistrationListener(listener); - // routingTable.registerRouteChangeListener(routeChangeListener); + public Optional getHandler() { + return Optional.fromNullable(this.handler); + } - Set currentlySupported = brokerSession.getSupportedRpcs(); - for (QName rpc : currentlySupported) { - listener.onRpcImplementationAdded(rpc); - } + public void setBrokerSession(ProviderSession session) { + this.brokerSession = session; + } - _logger.debug("RPC Server started [{}]", localUri); - } + public Optional getBrokerSession() { + return Optional.fromNullable(this.brokerSession); + } - public void stop() { - // TODO: un-subscribe + public Optional getZmqContext() { + return Optional.fromNullable(this.context); + } - // if (context != null) - // context.term(); - // - // _logger.debug("ZMQ Context is terminated."); + public String getServerAddress() { + return serverAddress; + } - if (serverPool != null) - serverPool.shutdown(); + public String getHandlerAddress() { + return HANDLER_INPROC_ADDRESS; + } - _logger.debug("Thread pool is closed."); - } + /** + * + */ + public void start() { + Preconditions.checkState(State.STOPPED == this.getStatus(), + "Remote RPC Server is already running"); - private Runnable receive() { - return new Runnable() { - public void run() { - - // Bind to RPC reply socket - replySocket = context.socket(ZMQ.REP); - replySocket.bind("tcp://*:" + Context.getInstance().getRpcPort()); - - // Poller enables listening on multiple sockets using a single - // thread - ZMQ.Poller poller = new ZMQ.Poller(1); - poller.register(replySocket, ZMQ.Poller.POLLIN); - try { - // TODO: Add code to restart the thread after exception - while (!Thread.currentThread().isInterrupted()) { - - poller.poll(); - - if (poller.pollin(0)) { - handleRpcCall(); - } - } - } catch (Exception e) { - // log and continue - _logger.error("Unhandled exception [{}]", e); - } finally { - poller.unregister(replySocket); - replySocket.close(); - } - - } - }; - } + status = State.STARTING; + context = ZMQ.context(1); + remoteServices = new HashSet();// + serverPool = Executors.newSingleThreadExecutor();//main server thread + serverPool.execute(receive()); // Start listening rpc requests + brokerSession.addRpcRegistrationListener(listener); - /** - * @throws InterruptedException - * @throws ExecutionException - */ - private void handleRpcCall() { + announceLocalRpcs(); - Message request = parseMessage(replySocket); + registerRemoteRpcs(); - _logger.debug("Received rpc request [{}]", request); + status = State.STARTED; + _logger.info("Remote RPC Server started [{}]", getServerAddress()); + } - // Call broker to process the message then reply - Future> rpc = null; - RpcResult result = null; - try { - rpc = brokerSession.rpc((QName) request.getRoute().getType(), - XmlUtils.xmlToCompositeNode((String) request.getPayload())); + public void stop(){ + close(); + } - result = (rpc != null) ? rpc.get() : null; + /** + * + */ + @Override + public void close() { - } catch (Exception e) { - _logger.debug("Broker threw [{}]", e); - } + if (State.STOPPED == this.getStatus()) return; //do nothing + + unregisterLocalRpcs(); - CompositeNode payload = (result != null) ? result.getResult() : null; + if (serverPool != null) + serverPool.shutdown(); - Message response = new Message.MessageBuilder().type(MessageType.RESPONSE).sender(localUri) - .route(request.getRoute()).payload(XmlUtils.compositeNodeToXml(payload)).build(); + closeZmqContext(); - _logger.debug("Sending rpc response [{}]", response); + status = State.STOPPED; + _logger.info("Remote RPC Server stopped"); + } + + /** + * Closes ZMQ Context. It tries to gracefully terminate the context. If + * termination takes more than a second, its forcefully shutdown. + */ + private void closeZmqContext() { + ExecutorService exec = Executors.newSingleThreadExecutor(); + FutureTask zmqTermination = new FutureTask(new Runnable() { + @Override + public void run() { try { - replySocket.send(Message.serialize(response)); + if (context != null) + context.term(); + _logger.debug("ZMQ Context terminated gracefully!"); } catch (Exception e) { - _logger.debug("rpc response send failed for message [{}]", response); - _logger.debug("{}", e); + _logger.debug("ZMQ Context termination threw exception [{}]. Continuing shutdown...", e); } + } + }, null); + + exec.execute(zmqTermination); + + try { + zmqTermination.get(5L, TimeUnit.SECONDS); + } catch (Exception e) {/*ignore and continue with shutdown*/} + + exec.shutdownNow(); + } + + /** + * Main listener thread that spawns {@link ServerRequestHandler} as workers. + * + * @return + */ + private Runnable receive() { + return new Runnable() { + + @Override + public void run() { + Thread.currentThread().setName("remote-rpc-server"); + _logger.debug("Remote RPC Server main thread starting..."); + + //socket clients connect to (frontend) + ZMQ.Socket clients = context.socket(ZMQ.ROUTER); + + //socket RequestHandlers connect to (backend) + ZMQ.Socket workers = context.socket(ZMQ.DEALER); + + try (SocketPair capturePair = new SocketPair(); + ServerRequestHandler requestHandler = new ServerRequestHandler(context, + brokerSession, + HANDLER_WORKER_COUNT, + HANDLER_INPROC_ADDRESS, + getServerAddress());) { + + handler = requestHandler; + clients.setHWM(HWM); + clients.bind("tcp://*:" + port); + workers.setHWM(HWM); + workers.bind(HANDLER_INPROC_ADDRESS); + //start worker threads + _logger.debug("Remote RPC Server worker threads starting..."); + requestHandler.start(); + //start capture thread + // handlerPool.execute(new CaptureHandler(capturePair.getReceiver())); + // Connect work threads to client threads via a queue + ZMQ.proxy(clients, workers, null);//capturePair.getSender()); + } catch (Exception e) { + _logger.debug("Unhandled exception [{}, {}]", e.getClass(), e.getMessage()); + } finally { + if (clients != null) clients.close(); + if (workers != null) workers.close(); + _logger.info("Remote RPC Server stopped"); + } + } + }; + } + + /** + * Register the remote RPCs from the routing table into broker + */ + private void registerRemoteRpcs(){ + Optional> routingTableOptional = routingTableProvider.getRoutingTable(); + + Preconditions.checkState(routingTableOptional.isPresent(), "Routing table is absent"); + + Set remoteRoutes = + routingTableProvider.getRoutingTable().get().getAllRoutes(); + + //filter out all entries that contains local address + //we dont want to register local RPCs as remote + Predicate notLocalAddressFilter = new Predicate(){ + public boolean apply(Map.Entry remoteRoute){ + return !getServerAddress().equalsIgnoreCase((String)remoteRoute.getValue()); + } + }; + + //filter the entries created by current node + Set filteredRemoteRoutes = Sets.filter(remoteRoutes, notLocalAddressFilter); + + for (Map.Entry route : filteredRemoteRoutes){ + onRouteUpdated((String) route.getKey(), "");//value is not needed by broker + } + } + + /** + * Un-Register the local RPCs from the routing table + */ + private void unregisterLocalRpcs(){ + Set currentlySupported = brokerSession.getSupportedRpcs(); + for (QName rpc : currentlySupported) { + listener.onRpcImplementationRemoved(rpc); + } + } + + /** + * Publish all the locally registered RPCs in the routing table + */ + private void announceLocalRpcs(){ + Set currentlySupported = brokerSession.getSupportedRpcs(); + for (QName rpc : currentlySupported) { + listener.onRpcImplementationAdded(rpc); + } + } + + /** + * @param key + * @param value + */ + @Override + public void onRouteUpdated(String key, String value) { + RouteIdentifierImpl rId = new RouteIdentifierImpl(); + try { + _logger.debug("Updating key/value {}-{}", key, value); + brokerSession.addRpcImplementation( + (QName) rId.fromString(key).getType(), client); + + //TODO: Check with Tony for routed rpc + //brokerSession.addRoutedRpcImplementation((QName) rId.fromString(key).getRoute(), client); + } catch (Exception e) { + _logger.info("Route update failed {}", e); } + } + + /** + * @param key + */ + @Override + public void onRouteDeleted(String key) { + //TODO: Broker session needs to be updated to support this + throw new UnsupportedOperationException(); + } + + /** + * Finds IPv4 address of the local VM + * TODO: This method is non-deterministic. There may be more than one IPv4 address. Cant say which + * address will be returned. Read IP from a property file or enhance the code to make it deterministic. + * Should we use IP or hostname? + * + * @return + */ + private String findIpAddress() { + String hostAddress = null; + Enumeration e = null; + try { + e = NetworkInterface.getNetworkInterfaces(); + } catch (SocketException e1) { + e1.printStackTrace(); + } + while (e.hasMoreElements()) { - /** - * @param socket - * @return - */ - private Message parseMessage(ZMQ.Socket socket) { + NetworkInterface n = (NetworkInterface) e.nextElement(); - Message msg = null; - try { - byte[] bytes = socket.recv(); - _logger.debug("Received bytes:[{}]", bytes.length); - msg = (Message) Message.deserialize(bytes); - } catch (Throwable t) { - t.printStackTrace(); - } - return msg; + Enumeration ee = n.getInetAddresses(); + while (ee.hasMoreElements()) { + InetAddress i = (InetAddress) ee.nextElement(); + if ((i instanceof Inet4Address) && (i.isSiteLocalAddress())) + hostAddress = i.getHostAddress(); + } } + return hostAddress; + + } + + /** + * Listener for rpc registrations + */ + private class RpcListener implements RpcRegistrationListener { @Override - public void onRouteUpdated(String key, Set values) { - RouteIdentifierImpl rId = new RouteIdentifierImpl(); - try { - _logger.debug("Updating key/value {}-{}", key, values); - brokerSession.addRpcImplementation((QName) rId.fromString(key).getType(), client); + public void onRpcImplementationAdded(QName name) { - } catch (Exception e) { - _logger.info("Route update failed {}", e); - } + //if the service name exists in the set, this notice + //has bounced back from the broker. It should be ignored + if (remoteServices.contains(name)) + return; + + _logger.debug("Adding registration for [{}]", name); + RouteIdentifierImpl routeId = new RouteIdentifierImpl(); + routeId.setType(name); + + RoutingTable routingTable = getRoutingTable(); + + try { + routingTable.addGlobalRoute(routeId.toString(), getServerAddress()); + _logger.debug("Route added [{}-{}]", name, getServerAddress()); + + } catch (RoutingTableException | SystemException e) { + //TODO: This can be thrown when route already exists in the table. Broker + //needs to handle this. + _logger.error("Unhandled exception while adding global route to routing table [{}]", e); + + } } @Override - public void onRouteDeleted(String key) { - // TODO: Broker session needs to be updated to support this - throw new UnsupportedOperationException(); + public void onRpcImplementationRemoved(QName name) { + + _logger.debug("Removing registration for [{}]", name); + RouteIdentifierImpl routeId = new RouteIdentifierImpl(); + routeId.setType(name); + + RoutingTable routingTable = getRoutingTable(); + + try { + routingTable.removeGlobalRoute(routeId.toString()); + } catch (RoutingTableException | SystemException e) { + _logger.error("Route delete failed {}", e); + } } - - /** - * Listener for rpc registrations - */ - private class RpcListener implements RpcRegistrationListener { - - - - @Override - public void onRpcImplementationAdded(QName name) { - - // if the service name exists in the set, this notice - // has bounced back from the broker. It should be ignored - if (remoteServices.contains(name)) - return; - - _logger.debug("Adding registration for [{}]", name); - RouteIdentifierImpl routeId = new RouteIdentifierImpl(); - routeId.setType(name); - - try { - routingTable.getRoutingTable().get().addGlobalRoute(routeId.toString(), localUri); - _logger.debug("Route added [{}-{}]", name, localUri); - } catch (RoutingTableException | SystemException e) { - // TODO: This can be thrown when route already exists in the - // table. Broker - // needs to handle this. - _logger.error("Unhandled exception while adding global route to routing table [{}]", e); - - } - } - @Override - public void onRpcImplementationRemoved(QName name) { + private RoutingTable getRoutingTable(){ + Optional> routingTable = + routingTableProvider.getRoutingTable(); - _logger.debug("Removing registration for [{}]", name); - RouteIdentifierImpl routeId = new RouteIdentifierImpl(); - routeId.setType(name); + checkNotNull(routingTable.isPresent(), "Routing table is null"); - try { - routingTable.getRoutingTable().get().removeGlobalRoute(routeId.toString()); - } catch (RoutingTableException | SystemException e) { - _logger.error("Route delete failed {}", e); - } - } + return routingTable.get(); } + } + + /* + * Listener for Route changes in broker. Broker notifies this listener in the event + * of any change (add/delete). Listener then updates the routing table. + */ + private class BrokerRouteChangeListener + implements org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener{ @Override - public void close() throws Exception { - stop(); - } + public void onRouteChange(RouteChange routeChange) { - public void setRoutingTableProvider(RoutingTableProvider provider) { - this.routingTable = provider; } + } }