Implement finding a primary based on the shard name and do basic wiring of Distribute...
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / ServerImpl.java
index b5a67ff0df97f3d110f1842074617468ce836b61..3acea356ceb4747feb48a3ae6126e0435708f74f 100644 (file)
@@ -7,47 +7,33 @@
  */
 package org.opendaylight.controller.sal.connector.remoterpc;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Sets;
-import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
-import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener;
-import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
-import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
-import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
-import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
-import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
-import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.zeromq.ZMQ;
-
 import java.net.Inet4Address;
 import java.net.InetAddress;
 import java.net.NetworkInterface;
 import java.net.SocketException;
 import java.util.Enumeration;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
 
-import static com.google.common.base.Preconditions.checkNotNull;
+import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.ZMQ;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 
 /**
- * ZeroMq based implementation of RpcRouter. It implements RouteChangeListener of RoutingTable
- * so that it gets route change notifications from routing table.
+ * ZeroMq based implementation of RpcRouter.
  */
-public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String, String> {
+public class ServerImpl implements RemoteRpcServer {
 
-  private Logger _logger = LoggerFactory.getLogger(ServerImpl.class);
+  private final Logger _logger = LoggerFactory.getLogger(ServerImpl.class);
 
   private ExecutorService serverPool;
   protected ServerRequestHandler handler;
@@ -56,19 +42,13 @@ public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String,
   private ProviderSession brokerSession;
   private ZMQ.Context context;
 
-  private final RpcListener listener = new RpcListener();
-
   private final String HANDLER_INPROC_ADDRESS = "inproc://rpc-request-handler";
   private final int HANDLER_WORKER_COUNT = 2;
   private final int HWM = 200;//high water mark on sockets
   private volatile State status = State.STOPPED;
 
   private String serverAddress;
-  private int port;
-
-  private ClientImpl client;
-
-  private  RoutingTableProvider routingTableProvider;
+  private final int port;
 
   public static enum State {
     STARTING, STARTED, STOPPED;
@@ -76,26 +56,6 @@ public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String,
 
   public ServerImpl(int port) {
     this.port = port;
-    this.serverAddress = new StringBuilder(findIpAddress()).
-                              append(":").
-                              append(port).
-                              toString();
-  }
-
-  public RoutingTableProvider getRoutingTableProvider() {
-    return routingTableProvider;
-  }
-
-  public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
-    this.routingTableProvider = routingTableProvider;
-  }
-
-  public ClientImpl getClient(){
-    return this.client;
-  }
-
-  public void setClient(ClientImpl client) {
-    this.client = client;
   }
 
   public State getStatus() {
@@ -134,15 +94,32 @@ public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String,
         "Remote RPC Server is already running");
 
     status = State.STARTING;
+    _logger.debug("Remote RPC Server is starting...");
+
+    String hostIpAddress = findIpAddress();
+
+    //Log and silently die as per discussion in the bug (bug-362)
+    //https://bugs.opendaylight.org/show_bug.cgi?id=362
+    //
+    // A tracking enhancement defect (bug-366) is created to properly fix this issue
+    //https://bugs.opendaylight.org/show_bug.cgi?id=366
+    //checkState(hostIpAddress != null, "Remote RPC Server could not acquire host ip address");
+
+    if (hostIpAddress == null) {
+      _logger.error("Remote RPC Server could not acquire host ip address. Stopping...");
+      stop();
+      return;
+    }
+
+    this.serverAddress = new StringBuilder(hostIpAddress).
+        append(":").
+        append(port).
+        toString();
+
     context = ZMQ.context(1);
     remoteServices = new HashSet<QName>();//
     serverPool = Executors.newSingleThreadExecutor();//main server thread
     serverPool.execute(receive()); // Start listening rpc requests
-    brokerSession.addRpcRegistrationListener(listener);
-
-    announceLocalRpcs();
-
-    registerRemoteRpcs();
 
     status = State.STARTED;
     _logger.info("Remote RPC Server started [{}]", getServerAddress());
@@ -160,8 +137,6 @@ public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String,
 
     if (State.STOPPED == this.getStatus()) return; //do nothing
 
-    unregisterLocalRpcs();
-
     if (serverPool != null)
       serverPool.shutdown();
 
@@ -173,11 +148,11 @@ public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String,
 
   /**
    * Closes ZMQ Context. It tries to gracefully terminate the context. If
-   * termination takes more than a second, its forcefully shutdown.
+   * termination takes more than 5 seconds, its forcefully shutdown.
    */
   private void closeZmqContext() {
     ExecutorService exec = Executors.newSingleThreadExecutor();
-    FutureTask zmqTermination = new FutureTask(new Runnable() {
+    FutureTask<?> zmqTermination = new FutureTask<Void>(new Runnable() {
 
       @Override
       public void run() {
@@ -250,81 +225,6 @@ public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String,
     };
   }
 
-  /**
-   * Register the remote RPCs from the routing table into broker
-   */
-  private void registerRemoteRpcs(){
-    Optional<RoutingTable<String, String>> routingTableOptional = routingTableProvider.getRoutingTable();
-
-    Preconditions.checkState(routingTableOptional.isPresent(), "Routing table is absent");
-
-    Set<Map.Entry> remoteRoutes =
-            routingTableProvider.getRoutingTable().get().getAllRoutes();
-
-    //filter out all entries that contains local address
-    //we dont want to register local RPCs as remote
-    Predicate<Map.Entry> notLocalAddressFilter = new Predicate<Map.Entry>(){
-      public boolean apply(Map.Entry remoteRoute){
-        return !getServerAddress().equalsIgnoreCase((String)remoteRoute.getValue());
-      }
-    };
-
-    //filter the entries created by current node
-    Set<Map.Entry> filteredRemoteRoutes = Sets.filter(remoteRoutes, notLocalAddressFilter);
-
-    for (Map.Entry route : filteredRemoteRoutes){
-      onRouteUpdated((String) route.getKey(), "");//value is not needed by broker
-    }
-  }
-
-  /**
-   * Un-Register the local RPCs from the routing table
-   */
-  private void unregisterLocalRpcs(){
-    Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
-    for (QName rpc : currentlySupported) {
-      listener.onRpcImplementationRemoved(rpc);
-    }
-  }
-
-  /**
-   * Publish all the locally registered RPCs in the routing table
-   */
-  private void announceLocalRpcs(){
-    Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
-    for (QName rpc : currentlySupported) {
-      listener.onRpcImplementationAdded(rpc);
-    }
-  }
-
-  /**
-   * @param key
-   * @param value
-   */
-  @Override
-  public void onRouteUpdated(String key, String value) {
-    RouteIdentifierImpl rId = new RouteIdentifierImpl();
-    try {
-      _logger.debug("Updating key/value {}-{}", key, value);
-      brokerSession.addRpcImplementation(
-          (QName) rId.fromString(key).getType(), client);
-
-      //TODO: Check with Tony for routed rpc
-      //brokerSession.addRoutedRpcImplementation((QName) rId.fromString(key).getRoute(), client);
-    } catch (Exception e) {
-      _logger.info("Route update failed {}", e);
-    }
-  }
-
-  /**
-   * @param key
-   */
-  @Override
-  public void onRouteDeleted(String key) {
-    //TODO: Broker session needs to be updated to support this
-    throw new UnsupportedOperationException();
-  }
-
   /**
    * Finds IPv4 address of the local VM
    * TODO: This method is non-deterministic. There may be more than one IPv4 address. Cant say which
@@ -334,96 +234,31 @@ public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String,
    * @return
    */
   private String findIpAddress() {
-    String hostAddress = null;
-    Enumeration e = null;
+    Enumeration<?> e = null;
     try {
       e = NetworkInterface.getNetworkInterfaces();
     } catch (SocketException e1) {
-      e1.printStackTrace();
+      _logger.error("Failed to get list of interfaces", e1);
+      return null;
     }
     while (e.hasMoreElements()) {
 
       NetworkInterface n = (NetworkInterface) e.nextElement();
 
-      Enumeration ee = n.getInetAddresses();
+      Enumeration<?> ee = n.getInetAddresses();
       while (ee.hasMoreElements()) {
         InetAddress i = (InetAddress) ee.nextElement();
-        if ((i instanceof Inet4Address) && (i.isSiteLocalAddress()))
-          hostAddress = i.getHostAddress();
-      }
-    }
-    return hostAddress;
-
-  }
-
-  /**
-   * Listener for rpc registrations
-   */
-  private class RpcListener implements RpcRegistrationListener {
-
-    @Override
-    public void onRpcImplementationAdded(QName name) {
-
-      //if the service name exists in the set, this notice
-      //has bounced back from the broker. It should be ignored
-      if (remoteServices.contains(name))
-        return;
-
-      _logger.debug("Adding registration for [{}]", name);
-      RouteIdentifierImpl routeId = new RouteIdentifierImpl();
-      routeId.setType(name);
-
-      RoutingTable<String, String> routingTable = getRoutingTable();
-
-      try {
-        routingTable.addGlobalRoute(routeId.toString(), getServerAddress());
-        _logger.debug("Route added [{}-{}]", name, getServerAddress());
-
-      } catch (RoutingTableException | SystemException e) {
-        //TODO: This can be thrown when route already exists in the table. Broker
-        //needs to handle this.
-        _logger.error("Unhandled exception while adding global route to routing table [{}]", e);
-
-      }
-    }
-
-    @Override
-    public void onRpcImplementationRemoved(QName name) {
-
-      _logger.debug("Removing registration for [{}]", name);
-      RouteIdentifierImpl routeId = new RouteIdentifierImpl();
-      routeId.setType(name);
-
-      RoutingTable<String, String> routingTable = getRoutingTable();
-
-      try {
-        routingTable.removeGlobalRoute(routeId.toString());
-      } catch (RoutingTableException | SystemException e) {
-        _logger.error("Route delete failed {}", e);
+        _logger.debug("Trying address {}", i);
+        if ((i instanceof Inet4Address) && (!i.isLoopbackAddress())) {
+          String hostAddress = i.getHostAddress();
+          _logger.debug("Settled on host address {}", hostAddress);
+          return hostAddress;
+        }
       }
     }
 
-    private RoutingTable<String, String> getRoutingTable(){
-      Optional<RoutingTable<String, String>> routingTable =
-          routingTableProvider.getRoutingTable();
-
-      checkNotNull(routingTable.isPresent(), "Routing table is null");
-
-      return routingTable.get();
-    }
-  }
-
-  /*
-   * Listener for Route changes in broker. Broker notifies this listener in the event
-   * of any change (add/delete). Listener then updates the routing table.
-   */
-  private class BrokerRouteChangeListener
-      implements org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener<RpcRoutingContext, InstanceIdentifier>{
-
-    @Override
-    public void onRouteChange(RouteChange<RpcRoutingContext, InstanceIdentifier> routeChange) {
-
-    }
+    _logger.error("Failed to find a suitable host address");
+    return null;
   }
 
 }