X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fimplementation%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fconnector%2Fremoterpc%2FServerImpl.java;h=3acea356ceb4747feb48a3ae6126e0435708f74f;hp=83b93858cff392c7ab02e2b9cc0499c45b5798dc;hb=b6e3e11ddcea90a2ae7f93c179625941e8e22ccd;hpb=616a88111ea9603f0d6f93c7462e6dab39644fcf diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerImpl.java index 83b93858cf..3acea356ce 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerImpl.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerImpl.java @@ -7,279 +7,258 @@ */ package org.opendaylight.controller.sal.connector.remoterpc; -import com.google.common.base.Optional; +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; -import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener; -import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable; -import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException; -import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException; -import org.opendaylight.controller.sal.connector.remoterpc.dto.Message; -import org.opendaylight.controller.sal.connector.remoterpc.dto.Message.MessageType; -import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl; -import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils; import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; -import org.opendaylight.controller.sal.core.api.RpcImplementation; -import org.opendaylight.controller.sal.core.api.RpcRegistrationListener; import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.common.RpcResult; -import org.opendaylight.yangtools.yang.data.api.CompositeNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.zeromq.ZMQ; -import java.io.IOException; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; /** - * ZeroMq based implementation of RpcRouter TODO: 1. Make rpc request handling - * async and non-blocking. Note zmq socket is not thread safe 2. Read properties - * from config file using existing(?) ODL properties framework + * ZeroMq based implementation of RpcRouter. */ -public class ServerImpl implements RemoteRpcServer, RouteChangeListener { - - private Logger _logger = LoggerFactory.getLogger(ServerImpl.class); +public class ServerImpl implements RemoteRpcServer { - private ExecutorService serverPool; + private final Logger _logger = LoggerFactory.getLogger(ServerImpl.class); - // private RoutingTable routingTable; - private RoutingTableProvider routingTable; - private Set remoteServices; - private ProviderSession brokerSession; - private ZMQ.Context context; - private ZMQ.Socket replySocket; + private ExecutorService serverPool; + protected ServerRequestHandler handler; - private final RpcListener listener = new RpcListener(); + private Set remoteServices; + private ProviderSession brokerSession; + private ZMQ.Context context; - private final String localUri = Context.getInstance().getLocalUri(); + private final String HANDLER_INPROC_ADDRESS = "inproc://rpc-request-handler"; + private final int HANDLER_WORKER_COUNT = 2; + private final int HWM = 200;//high water mark on sockets + private volatile State status = State.STOPPED; - private final int rpcPort; + private String serverAddress; + private final int port; - private RpcImplementation client; + public static enum State { + STARTING, STARTED, STOPPED; + } - public RpcImplementation getClient() { - return client; - } + public ServerImpl(int port) { + this.port = port; + } - public void setClient(RpcImplementation client) { - this.client = client; - } + public State getStatus() { + return this.status; + } - // Prevent instantiation - public ServerImpl(int rpcPort) { - this.rpcPort = rpcPort; - } + public Optional getHandler() { + return Optional.fromNullable(this.handler); + } - public void setBrokerSession(ProviderSession session) { - this.brokerSession = session; - } + public void setBrokerSession(ProviderSession session) { + this.brokerSession = session; + } - public ExecutorService getServerPool() { - return serverPool; - } + public Optional getBrokerSession() { + return Optional.fromNullable(this.brokerSession); + } - public void setServerPool(ExecutorService serverPool) { - this.serverPool = serverPool; - } + public Optional getZmqContext() { + return Optional.fromNullable(this.context); + } - public void start() { - context = ZMQ.context(1); - serverPool = Executors.newSingleThreadExecutor(); - remoteServices = new HashSet(); + public String getServerAddress() { + return serverAddress; + } - // Start listening rpc requests - serverPool.execute(receive()); + public String getHandlerAddress() { + return HANDLER_INPROC_ADDRESS; + } - brokerSession.addRpcRegistrationListener(listener); - // routingTable.registerRouteChangeListener(routeChangeListener); + /** + * + */ + public void start() { + Preconditions.checkState(State.STOPPED == this.getStatus(), + "Remote RPC Server is already running"); - Set currentlySupported = brokerSession.getSupportedRpcs(); - for (QName rpc : currentlySupported) { - listener.onRpcImplementationAdded(rpc); - } - - _logger.debug("RPC Server started [{}]", localUri); - } + status = State.STARTING; + _logger.debug("Remote RPC Server is starting..."); - public void stop() { - // TODO: un-subscribe + String hostIpAddress = findIpAddress(); - // if (context != null) - // context.term(); - // - // _logger.debug("ZMQ Context is terminated."); + //Log and silently die as per discussion in the bug (bug-362) + //https://bugs.opendaylight.org/show_bug.cgi?id=362 + // + // A tracking enhancement defect (bug-366) is created to properly fix this issue + //https://bugs.opendaylight.org/show_bug.cgi?id=366 + //checkState(hostIpAddress != null, "Remote RPC Server could not acquire host ip address"); - if (serverPool != null) - serverPool.shutdown(); - - _logger.debug("Thread pool is closed."); + if (hostIpAddress == null) { + _logger.error("Remote RPC Server could not acquire host ip address. Stopping..."); + stop(); + return; } - private Runnable receive() { - return new Runnable() { - public void run() { - - // Bind to RPC reply socket - replySocket = context.socket(ZMQ.REP); - replySocket.bind("tcp://*:" + Context.getInstance().getRpcPort()); - - // Poller enables listening on multiple sockets using a single - // thread - ZMQ.Poller poller = new ZMQ.Poller(1); - poller.register(replySocket, ZMQ.Poller.POLLIN); - try { - // TODO: Add code to restart the thread after exception - while (!Thread.currentThread().isInterrupted()) { - - poller.poll(); - - if (poller.pollin(0)) { - handleRpcCall(); - } - } - } catch (Exception e) { - // log and continue - _logger.error("Unhandled exception [{}]", e); - } finally { - poller.unregister(replySocket); - replySocket.close(); - } - - } - }; - } + this.serverAddress = new StringBuilder(hostIpAddress). + append(":"). + append(port). + toString(); - /** - * @throws InterruptedException - * @throws ExecutionException - */ - private void handleRpcCall() { + context = ZMQ.context(1); + remoteServices = new HashSet();// + serverPool = Executors.newSingleThreadExecutor();//main server thread + serverPool.execute(receive()); // Start listening rpc requests - Message request = parseMessage(replySocket); + status = State.STARTED; + _logger.info("Remote RPC Server started [{}]", getServerAddress()); + } - _logger.debug("Received rpc request [{}]", request); + public void stop(){ + close(); + } - // Call broker to process the message then reply - Future> rpc = null; - RpcResult result = null; - try { - rpc = brokerSession.rpc((QName) request.getRoute().getType(), - XmlUtils.xmlToCompositeNode((String) request.getPayload())); + /** + * + */ + @Override + public void close() { - result = (rpc != null) ? rpc.get() : null; + if (State.STOPPED == this.getStatus()) return; //do nothing - } catch (Exception e) { - _logger.debug("Broker threw [{}]", e); - } + if (serverPool != null) + serverPool.shutdown(); - CompositeNode payload = (result != null) ? result.getResult() : null; + closeZmqContext(); - Message response = new Message.MessageBuilder().type(MessageType.RESPONSE).sender(localUri) - .route(request.getRoute()).payload(XmlUtils.compositeNodeToXml(payload)).build(); + status = State.STOPPED; + _logger.info("Remote RPC Server stopped"); + } - _logger.debug("Sending rpc response [{}]", response); + /** + * Closes ZMQ Context. It tries to gracefully terminate the context. If + * termination takes more than 5 seconds, its forcefully shutdown. + */ + private void closeZmqContext() { + ExecutorService exec = Executors.newSingleThreadExecutor(); + FutureTask zmqTermination = new FutureTask(new Runnable() { + @Override + public void run() { try { - replySocket.send(Message.serialize(response)); + if (context != null) + context.term(); + _logger.debug("ZMQ Context terminated gracefully!"); } catch (Exception e) { - _logger.debug("rpc response send failed for message [{}]", response); - _logger.debug("{}", e); + _logger.debug("ZMQ Context termination threw exception [{}]. Continuing shutdown...", e); } - - } - - /** - * @param socket - * @return - */ - private Message parseMessage(ZMQ.Socket socket) { - - Message msg = null; - try { - byte[] bytes = socket.recv(); - _logger.debug("Received bytes:[{}]", bytes.length); - msg = (Message) Message.deserialize(bytes); - } catch (Throwable t) { - t.printStackTrace(); - } - return msg; - } - - @Override - public void onRouteUpdated(String key, Set values) { - RouteIdentifierImpl rId = new RouteIdentifierImpl(); - try { - _logger.debug("Updating key/value {}-{}", key, values); - brokerSession.addRpcImplementation((QName) rId.fromString(key).getType(), client); + } + }, null); + + exec.execute(zmqTermination); + + try { + zmqTermination.get(5L, TimeUnit.SECONDS); + } catch (Exception e) {/*ignore and continue with shutdown*/} + + exec.shutdownNow(); + } + + /** + * Main listener thread that spawns {@link ServerRequestHandler} as workers. + * + * @return + */ + private Runnable receive() { + return new Runnable() { + + @Override + public void run() { + Thread.currentThread().setName("remote-rpc-server"); + _logger.debug("Remote RPC Server main thread starting..."); + + //socket clients connect to (frontend) + ZMQ.Socket clients = context.socket(ZMQ.ROUTER); + + //socket RequestHandlers connect to (backend) + ZMQ.Socket workers = context.socket(ZMQ.DEALER); + + try (SocketPair capturePair = new SocketPair(); + ServerRequestHandler requestHandler = new ServerRequestHandler(context, + brokerSession, + HANDLER_WORKER_COUNT, + HANDLER_INPROC_ADDRESS, + getServerAddress());) { + + handler = requestHandler; + clients.setHWM(HWM); + clients.bind("tcp://*:" + port); + workers.setHWM(HWM); + workers.bind(HANDLER_INPROC_ADDRESS); + //start worker threads + _logger.debug("Remote RPC Server worker threads starting..."); + requestHandler.start(); + //start capture thread + // handlerPool.execute(new CaptureHandler(capturePair.getReceiver())); + // Connect work threads to client threads via a queue + ZMQ.proxy(clients, workers, null);//capturePair.getSender()); } catch (Exception e) { - _logger.info("Route update failed {}", e); + _logger.debug("Unhandled exception [{}, {}]", e.getClass(), e.getMessage()); + } finally { + if (clients != null) clients.close(); + if (workers != null) workers.close(); + _logger.info("Remote RPC Server stopped"); } + } + }; + } + + /** + * Finds IPv4 address of the local VM + * TODO: This method is non-deterministic. There may be more than one IPv4 address. Cant say which + * address will be returned. Read IP from a property file or enhance the code to make it deterministic. + * Should we use IP or hostname? + * + * @return + */ + private String findIpAddress() { + Enumeration e = null; + try { + e = NetworkInterface.getNetworkInterfaces(); + } catch (SocketException e1) { + _logger.error("Failed to get list of interfaces", e1); + return null; } - - @Override - public void onRouteDeleted(String key) { - // TODO: Broker session needs to be updated to support this - throw new UnsupportedOperationException(); - } - - /** - * Listener for rpc registrations - */ - private class RpcListener implements RpcRegistrationListener { - - - - @Override - public void onRpcImplementationAdded(QName name) { - - // if the service name exists in the set, this notice - // has bounced back from the broker. It should be ignored - if (remoteServices.contains(name)) - return; - - _logger.debug("Adding registration for [{}]", name); - RouteIdentifierImpl routeId = new RouteIdentifierImpl(); - routeId.setType(name); - - try { - routingTable.getRoutingTable().get().addGlobalRoute(routeId.toString(), localUri); - _logger.debug("Route added [{}-{}]", name, localUri); - } catch (RoutingTableException | SystemException e) { - // TODO: This can be thrown when route already exists in the - // table. Broker - // needs to handle this. - _logger.error("Unhandled exception while adding global route to routing table [{}]", e); - - } - } - - @Override - public void onRpcImplementationRemoved(QName name) { - - _logger.debug("Removing registration for [{}]", name); - RouteIdentifierImpl routeId = new RouteIdentifierImpl(); - routeId.setType(name); - - try { - routingTable.getRoutingTable().get().removeGlobalRoute(routeId.toString()); - } catch (RoutingTableException | SystemException e) { - _logger.error("Route delete failed {}", e); - } + while (e.hasMoreElements()) { + + NetworkInterface n = (NetworkInterface) e.nextElement(); + + Enumeration ee = n.getInetAddresses(); + while (ee.hasMoreElements()) { + InetAddress i = (InetAddress) ee.nextElement(); + _logger.debug("Trying address {}", i); + if ((i instanceof Inet4Address) && (!i.isLoopbackAddress())) { + String hostAddress = i.getHostAddress(); + _logger.debug("Settled on host address {}", hostAddress); + return hostAddress; } + } } - @Override - public void close() throws Exception { - stop(); - } - - public void setRoutingTableProvider(RoutingTableProvider provider) { - this.routingTable = provider; - } + _logger.error("Failed to find a suitable host address"); + return null; + } }