Enhancements to remote rpc server. Using zmq router-dealer bridge to make the server...
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / ServerImpl.java
index 83b93858cff392c7ab02e2b9cc0499c45b5798dc..b5a67ff0df97f3d110f1842074617468ce836b61 100644 (file)
 package org.opendaylight.controller.sal.connector.remoterpc;
 
 import com.google.common.base.Optional;
-
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Sets;
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
 import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener;
 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
 import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.Message.MessageType;
 import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
-import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
-import org.opendaylight.controller.sal.core.api.RpcImplementation;
 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
+import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
 import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.zeromq.ZMQ;
 
-import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.Enumeration;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
- * ZeroMq based implementation of RpcRouter TODO: 1. Make rpc request handling
- * async and non-blocking. Note zmq socket is not thread safe 2. Read properties
- * from config file using existing(?) ODL properties framework
+ * ZeroMq based implementation of RpcRouter. It implements RouteChangeListener of RoutingTable
+ * so that it gets route change notifications from routing table.
  */
-public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String, Set> {
+public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String, String> {
 
-    private Logger _logger = LoggerFactory.getLogger(ServerImpl.class);
+  private Logger _logger = LoggerFactory.getLogger(ServerImpl.class);
 
-    private ExecutorService serverPool;
+  private ExecutorService serverPool;
+  protected ServerRequestHandler handler;
 
-    // private RoutingTable<RpcRouter.RouteIdentifier, String> routingTable;
-    private RoutingTableProvider routingTable;
-    private Set<QName> remoteServices;
-    private ProviderSession brokerSession;
-    private ZMQ.Context context;
-    private ZMQ.Socket replySocket;
+  private Set<QName> remoteServices;
+  private ProviderSession brokerSession;
+  private ZMQ.Context context;
 
-    private final RpcListener listener = new RpcListener();
+  private final RpcListener listener = new RpcListener();
 
-    private final String localUri = Context.getInstance().getLocalUri();
+  private final String HANDLER_INPROC_ADDRESS = "inproc://rpc-request-handler";
+  private final int HANDLER_WORKER_COUNT = 2;
+  private final int HWM = 200;//high water mark on sockets
+  private volatile State status = State.STOPPED;
 
-    private final int rpcPort;
+  private String serverAddress;
+  private int port;
 
-    private RpcImplementation client;
+  private ClientImpl client;
 
-    public RpcImplementation getClient() {
-        return client;
-    }
+  private  RoutingTableProvider routingTableProvider;
 
-    public void setClient(RpcImplementation client) {
-        this.client = client;
-    }
+  public static enum State {
+    STARTING, STARTED, STOPPED;
+  }
 
-    // Prevent instantiation
-    public ServerImpl(int rpcPort) {
-        this.rpcPort = rpcPort;
-    }
+  public ServerImpl(int port) {
+    this.port = port;
+    this.serverAddress = new StringBuilder(findIpAddress()).
+                              append(":").
+                              append(port).
+                              toString();
+  }
 
-    public void setBrokerSession(ProviderSession session) {
-        this.brokerSession = session;
-    }
+  public RoutingTableProvider getRoutingTableProvider() {
+    return routingTableProvider;
+  }
 
-    public ExecutorService getServerPool() {
-        return serverPool;
-    }
+  public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
+    this.routingTableProvider = routingTableProvider;
+  }
 
-    public void setServerPool(ExecutorService serverPool) {
-        this.serverPool = serverPool;
-    }
+  public ClientImpl getClient(){
+    return this.client;
+  }
 
-    public void start() {
-        context = ZMQ.context(1);
-        serverPool = Executors.newSingleThreadExecutor();
-        remoteServices = new HashSet<QName>();
+  public void setClient(ClientImpl client) {
+    this.client = client;
+  }
 
-        // Start listening rpc requests
-        serverPool.execute(receive());
+  public State getStatus() {
+    return this.status;
+  }
 
-        brokerSession.addRpcRegistrationListener(listener);
-        // routingTable.registerRouteChangeListener(routeChangeListener);
+  public Optional<ServerRequestHandler> getHandler() {
+    return Optional.fromNullable(this.handler);
+  }
 
-        Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
-        for (QName rpc : currentlySupported) {
-            listener.onRpcImplementationAdded(rpc);
-        }
+  public void setBrokerSession(ProviderSession session) {
+    this.brokerSession = session;
+  }
 
-        _logger.debug("RPC Server started [{}]", localUri);
-    }
+  public Optional<ProviderSession> getBrokerSession() {
+    return Optional.fromNullable(this.brokerSession);
+  }
 
-    public void stop() {
-        // TODO: un-subscribe
+  public Optional<ZMQ.Context> getZmqContext() {
+    return Optional.fromNullable(this.context);
+  }
 
-        // if (context != null)
-        // context.term();
-        //
-        // _logger.debug("ZMQ Context is terminated.");
+  public String getServerAddress() {
+    return serverAddress;
+  }
 
-        if (serverPool != null)
-            serverPool.shutdown();
+  public String getHandlerAddress() {
+    return HANDLER_INPROC_ADDRESS;
+  }
 
-        _logger.debug("Thread pool is closed.");
-    }
+  /**
+   *
+   */
+  public void start() {
+    Preconditions.checkState(State.STOPPED == this.getStatus(),
+        "Remote RPC Server is already running");
 
-    private Runnable receive() {
-        return new Runnable() {
-            public void run() {
-
-                // Bind to RPC reply socket
-                replySocket = context.socket(ZMQ.REP);
-                replySocket.bind("tcp://*:" + Context.getInstance().getRpcPort());
-
-                // Poller enables listening on multiple sockets using a single
-                // thread
-                ZMQ.Poller poller = new ZMQ.Poller(1);
-                poller.register(replySocket, ZMQ.Poller.POLLIN);
-                try {
-                    // TODO: Add code to restart the thread after exception
-                    while (!Thread.currentThread().isInterrupted()) {
-
-                        poller.poll();
-
-                        if (poller.pollin(0)) {
-                            handleRpcCall();
-                        }
-                    }
-                } catch (Exception e) {
-                    // log and continue
-                    _logger.error("Unhandled exception [{}]", e);
-                } finally {
-                    poller.unregister(replySocket);
-                    replySocket.close();
-                }
-
-            }
-        };
-    }
+    status = State.STARTING;
+    context = ZMQ.context(1);
+    remoteServices = new HashSet<QName>();//
+    serverPool = Executors.newSingleThreadExecutor();//main server thread
+    serverPool.execute(receive()); // Start listening rpc requests
+    brokerSession.addRpcRegistrationListener(listener);
 
-    /**
-     * @throws InterruptedException
-     * @throws ExecutionException
-     */
-    private void handleRpcCall() {
+    announceLocalRpcs();
 
-        Message request = parseMessage(replySocket);
+    registerRemoteRpcs();
 
-        _logger.debug("Received rpc request [{}]", request);
+    status = State.STARTED;
+    _logger.info("Remote RPC Server started [{}]", getServerAddress());
+  }
 
-        // Call broker to process the message then reply
-        Future<RpcResult<CompositeNode>> rpc = null;
-        RpcResult<CompositeNode> result = null;
-        try {
-            rpc = brokerSession.rpc((QName) request.getRoute().getType(),
-                    XmlUtils.xmlToCompositeNode((String) request.getPayload()));
+  public void stop(){
+    close();
+  }
 
-            result = (rpc != null) ? rpc.get() : null;
+  /**
+   *
+   */
+  @Override
+  public void close() {
 
-        } catch (Exception e) {
-            _logger.debug("Broker threw  [{}]", e);
-        }
+    if (State.STOPPED == this.getStatus()) return; //do nothing
+
+    unregisterLocalRpcs();
 
-        CompositeNode payload = (result != null) ? result.getResult() : null;
+    if (serverPool != null)
+      serverPool.shutdown();
 
-        Message response = new Message.MessageBuilder().type(MessageType.RESPONSE).sender(localUri)
-                .route(request.getRoute()).payload(XmlUtils.compositeNodeToXml(payload)).build();
+    closeZmqContext();
 
-        _logger.debug("Sending rpc response [{}]", response);
+    status = State.STOPPED;
+    _logger.info("Remote RPC Server stopped");
+  }
+
+  /**
+   * Closes ZMQ Context. It tries to gracefully terminate the context. If
+   * termination takes more than a second, its forcefully shutdown.
+   */
+  private void closeZmqContext() {
+    ExecutorService exec = Executors.newSingleThreadExecutor();
+    FutureTask zmqTermination = new FutureTask(new Runnable() {
 
+      @Override
+      public void run() {
         try {
-            replySocket.send(Message.serialize(response));
+          if (context != null)
+            context.term();
+          _logger.debug("ZMQ Context terminated gracefully!");
         } catch (Exception e) {
-            _logger.debug("rpc response send failed for message [{}]", response);
-            _logger.debug("{}", e);
+          _logger.debug("ZMQ Context termination threw exception [{}]. Continuing shutdown...", e);
         }
+      }
+    }, null);
+
+    exec.execute(zmqTermination);
+
+    try {
+      zmqTermination.get(5L, TimeUnit.SECONDS);
+    } catch (Exception e) {/*ignore and continue with shutdown*/}
+
+    exec.shutdownNow();
+  }
+
+  /**
+   * Main listener thread that spawns {@link ServerRequestHandler} as workers.
+   *
+   * @return
+   */
+  private Runnable receive() {
+    return new Runnable() {
+
+      @Override
+      public void run() {
+        Thread.currentThread().setName("remote-rpc-server");
+        _logger.debug("Remote RPC Server main thread starting...");
+
+        //socket clients connect to (frontend)
+        ZMQ.Socket clients = context.socket(ZMQ.ROUTER);
+
+        //socket RequestHandlers connect to (backend)
+        ZMQ.Socket workers = context.socket(ZMQ.DEALER);
+
+        try (SocketPair capturePair = new SocketPair();
+             ServerRequestHandler requestHandler = new ServerRequestHandler(context,
+                 brokerSession,
+                 HANDLER_WORKER_COUNT,
+                 HANDLER_INPROC_ADDRESS,
+                 getServerAddress());) {
+
+          handler = requestHandler;
+          clients.setHWM(HWM);
+          clients.bind("tcp://*:" + port);
+          workers.setHWM(HWM);
+          workers.bind(HANDLER_INPROC_ADDRESS);
+          //start worker threads
+          _logger.debug("Remote RPC Server worker threads starting...");
+          requestHandler.start();
+          //start capture thread
+          // handlerPool.execute(new CaptureHandler(capturePair.getReceiver()));
+          //  Connect work threads to client threads via a queue
+          ZMQ.proxy(clients, workers, null);//capturePair.getSender());
 
+        } catch (Exception e) {
+          _logger.debug("Unhandled exception [{}, {}]", e.getClass(), e.getMessage());
+        } finally {
+          if (clients != null) clients.close();
+          if (workers != null) workers.close();
+          _logger.info("Remote RPC Server stopped");
+        }
+      }
+    };
+  }
+
+  /**
+   * Register the remote RPCs from the routing table into broker
+   */
+  private void registerRemoteRpcs(){
+    Optional<RoutingTable<String, String>> routingTableOptional = routingTableProvider.getRoutingTable();
+
+    Preconditions.checkState(routingTableOptional.isPresent(), "Routing table is absent");
+
+    Set<Map.Entry> remoteRoutes =
+            routingTableProvider.getRoutingTable().get().getAllRoutes();
+
+    //filter out all entries that contains local address
+    //we dont want to register local RPCs as remote
+    Predicate<Map.Entry> notLocalAddressFilter = new Predicate<Map.Entry>(){
+      public boolean apply(Map.Entry remoteRoute){
+        return !getServerAddress().equalsIgnoreCase((String)remoteRoute.getValue());
+      }
+    };
+
+    //filter the entries created by current node
+    Set<Map.Entry> filteredRemoteRoutes = Sets.filter(remoteRoutes, notLocalAddressFilter);
+
+    for (Map.Entry route : filteredRemoteRoutes){
+      onRouteUpdated((String) route.getKey(), "");//value is not needed by broker
+    }
+  }
+
+  /**
+   * Un-Register the local RPCs from the routing table
+   */
+  private void unregisterLocalRpcs(){
+    Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
+    for (QName rpc : currentlySupported) {
+      listener.onRpcImplementationRemoved(rpc);
+    }
+  }
+
+  /**
+   * Publish all the locally registered RPCs in the routing table
+   */
+  private void announceLocalRpcs(){
+    Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
+    for (QName rpc : currentlySupported) {
+      listener.onRpcImplementationAdded(rpc);
+    }
+  }
+
+  /**
+   * @param key
+   * @param value
+   */
+  @Override
+  public void onRouteUpdated(String key, String value) {
+    RouteIdentifierImpl rId = new RouteIdentifierImpl();
+    try {
+      _logger.debug("Updating key/value {}-{}", key, value);
+      brokerSession.addRpcImplementation(
+          (QName) rId.fromString(key).getType(), client);
+
+      //TODO: Check with Tony for routed rpc
+      //brokerSession.addRoutedRpcImplementation((QName) rId.fromString(key).getRoute(), client);
+    } catch (Exception e) {
+      _logger.info("Route update failed {}", e);
     }
+  }
+
+  /**
+   * @param key
+   */
+  @Override
+  public void onRouteDeleted(String key) {
+    //TODO: Broker session needs to be updated to support this
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Finds IPv4 address of the local VM
+   * TODO: This method is non-deterministic. There may be more than one IPv4 address. Cant say which
+   * address will be returned. Read IP from a property file or enhance the code to make it deterministic.
+   * Should we use IP or hostname?
+   *
+   * @return
+   */
+  private String findIpAddress() {
+    String hostAddress = null;
+    Enumeration e = null;
+    try {
+      e = NetworkInterface.getNetworkInterfaces();
+    } catch (SocketException e1) {
+      e1.printStackTrace();
+    }
+    while (e.hasMoreElements()) {
 
-    /**
-     * @param socket
-     * @return
-     */
-    private Message parseMessage(ZMQ.Socket socket) {
+      NetworkInterface n = (NetworkInterface) e.nextElement();
 
-        Message msg = null;
-        try {
-            byte[] bytes = socket.recv();
-            _logger.debug("Received bytes:[{}]", bytes.length);
-            msg = (Message) Message.deserialize(bytes);
-        } catch (Throwable t) {
-            t.printStackTrace();
-        }
-        return msg;
+      Enumeration ee = n.getInetAddresses();
+      while (ee.hasMoreElements()) {
+        InetAddress i = (InetAddress) ee.nextElement();
+        if ((i instanceof Inet4Address) && (i.isSiteLocalAddress()))
+          hostAddress = i.getHostAddress();
+      }
     }
+    return hostAddress;
+
+  }
+
+  /**
+   * Listener for rpc registrations
+   */
+  private class RpcListener implements RpcRegistrationListener {
 
     @Override
-    public void onRouteUpdated(String key, Set values) {
-        RouteIdentifierImpl rId = new RouteIdentifierImpl();
-        try {
-            _logger.debug("Updating key/value {}-{}", key, values);
-            brokerSession.addRpcImplementation((QName) rId.fromString(key).getType(), client);
+    public void onRpcImplementationAdded(QName name) {
 
-        } catch (Exception e) {
-            _logger.info("Route update failed {}", e);
-        }
+      //if the service name exists in the set, this notice
+      //has bounced back from the broker. It should be ignored
+      if (remoteServices.contains(name))
+        return;
+
+      _logger.debug("Adding registration for [{}]", name);
+      RouteIdentifierImpl routeId = new RouteIdentifierImpl();
+      routeId.setType(name);
+
+      RoutingTable<String, String> routingTable = getRoutingTable();
+
+      try {
+        routingTable.addGlobalRoute(routeId.toString(), getServerAddress());
+        _logger.debug("Route added [{}-{}]", name, getServerAddress());
+
+      } catch (RoutingTableException | SystemException e) {
+        //TODO: This can be thrown when route already exists in the table. Broker
+        //needs to handle this.
+        _logger.error("Unhandled exception while adding global route to routing table [{}]", e);
+
+      }
     }
 
     @Override
-    public void onRouteDeleted(String key) {
-        // TODO: Broker session needs to be updated to support this
-        throw new UnsupportedOperationException();
+    public void onRpcImplementationRemoved(QName name) {
+
+      _logger.debug("Removing registration for [{}]", name);
+      RouteIdentifierImpl routeId = new RouteIdentifierImpl();
+      routeId.setType(name);
+
+      RoutingTable<String, String> routingTable = getRoutingTable();
+
+      try {
+        routingTable.removeGlobalRoute(routeId.toString());
+      } catch (RoutingTableException | SystemException e) {
+        _logger.error("Route delete failed {}", e);
+      }
     }
-    
-    /**
-     * Listener for rpc registrations
-     */
-    private class RpcListener implements RpcRegistrationListener {
-
-        
-
-        @Override
-        public void onRpcImplementationAdded(QName name) {
-
-            // if the service name exists in the set, this notice
-            // has bounced back from the broker. It should be ignored
-            if (remoteServices.contains(name))
-                return;
-
-            _logger.debug("Adding registration for [{}]", name);
-            RouteIdentifierImpl routeId = new RouteIdentifierImpl();
-            routeId.setType(name);
-
-            try {
-                routingTable.getRoutingTable().get().addGlobalRoute(routeId.toString(), localUri);
-                _logger.debug("Route added [{}-{}]", name, localUri);
-            } catch (RoutingTableException | SystemException e) {
-                // TODO: This can be thrown when route already exists in the
-                // table. Broker
-                // needs to handle this.
-                _logger.error("Unhandled exception while adding global route to routing table [{}]", e);
-
-            }
-        }
 
-        @Override
-        public void onRpcImplementationRemoved(QName name) {
+    private RoutingTable<String, String> getRoutingTable(){
+      Optional<RoutingTable<String, String>> routingTable =
+          routingTableProvider.getRoutingTable();
 
-            _logger.debug("Removing registration for [{}]", name);
-            RouteIdentifierImpl routeId = new RouteIdentifierImpl();
-            routeId.setType(name);
+      checkNotNull(routingTable.isPresent(), "Routing table is null");
 
-            try {
-                routingTable.getRoutingTable().get().removeGlobalRoute(routeId.toString());
-            } catch (RoutingTableException | SystemException e) {
-                _logger.error("Route delete failed {}", e);
-            }
-        }
+      return routingTable.get();
     }
+  }
+
+  /*
+   * Listener for Route changes in broker. Broker notifies this listener in the event
+   * of any change (add/delete). Listener then updates the routing table.
+   */
+  private class BrokerRouteChangeListener
+      implements org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener<RpcRoutingContext, InstanceIdentifier>{
 
     @Override
-    public void close() throws Exception {
-        stop();
-    }
+    public void onRouteChange(RouteChange<RpcRoutingContext, InstanceIdentifier> routeChange) {
 
-    public void setRoutingTableProvider(RoutingTableProvider provider) {
-        this.routingTable = provider;
     }
+  }
 
 }