Implement finding a primary based on the shard name and do basic wiring of Distribute...
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / implementation / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / ServerImpl.java
index 5c14dd0c453ce8365e528506d743699edf418f3a..3acea356ceb4747feb48a3ae6126e0435708f74f 100644 (file)
@@ -7,48 +7,33 @@
  */
 package org.opendaylight.controller.sal.connector.remoterpc;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Sets;
-import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
-import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener;
-import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
-import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
-import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
-import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
-import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
-import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.zeromq.ZMQ;
-
 import java.net.Inet4Address;
 import java.net.InetAddress;
 import java.net.NetworkInterface;
 import java.net.SocketException;
 import java.util.Enumeration;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
+import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.ZMQ;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 
 /**
- * ZeroMq based implementation of RpcRouter. It implements RouteChangeListener of RoutingTable
- * so that it gets route change notifications from routing table.
+ * ZeroMq based implementation of RpcRouter.
  */
-public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String, String> {
+public class ServerImpl implements RemoteRpcServer {
 
-  private Logger _logger = LoggerFactory.getLogger(ServerImpl.class);
+  private final Logger _logger = LoggerFactory.getLogger(ServerImpl.class);
 
   private ExecutorService serverPool;
   protected ServerRequestHandler handler;
@@ -57,19 +42,13 @@ public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String,
   private ProviderSession brokerSession;
   private ZMQ.Context context;
 
-  private final RpcListener listener = new RpcListener();
-
   private final String HANDLER_INPROC_ADDRESS = "inproc://rpc-request-handler";
   private final int HANDLER_WORKER_COUNT = 2;
   private final int HWM = 200;//high water mark on sockets
   private volatile State status = State.STOPPED;
 
   private String serverAddress;
-  private int port;
-
-  private ClientImpl client;
-
-  private  RoutingTableProvider routingTableProvider;
+  private final int port;
 
   public static enum State {
     STARTING, STARTED, STOPPED;
@@ -79,22 +58,6 @@ public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String,
     this.port = port;
   }
 
-  public RoutingTableProvider getRoutingTableProvider() {
-    return routingTableProvider;
-  }
-
-  public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
-    this.routingTableProvider = routingTableProvider;
-  }
-
-  public ClientImpl getClient(){
-    return this.client;
-  }
-
-  public void setClient(ClientImpl client) {
-    this.client = client;
-  }
-
   public State getStatus() {
     return this.status;
   }
@@ -157,11 +120,6 @@ public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String,
     remoteServices = new HashSet<QName>();//
     serverPool = Executors.newSingleThreadExecutor();//main server thread
     serverPool.execute(receive()); // Start listening rpc requests
-    brokerSession.addRpcRegistrationListener(listener);
-
-    announceLocalRpcs();
-
-    registerRemoteRpcs();
 
     status = State.STARTED;
     _logger.info("Remote RPC Server started [{}]", getServerAddress());
@@ -179,8 +137,6 @@ public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String,
 
     if (State.STOPPED == this.getStatus()) return; //do nothing
 
-    unregisterLocalRpcs();
-
     if (serverPool != null)
       serverPool.shutdown();
 
@@ -192,11 +148,11 @@ public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String,
 
   /**
    * Closes ZMQ Context. It tries to gracefully terminate the context. If
-   * termination takes more than a second, its forcefully shutdown.
+   * termination takes more than 5 seconds, its forcefully shutdown.
    */
   private void closeZmqContext() {
     ExecutorService exec = Executors.newSingleThreadExecutor();
-    FutureTask zmqTermination = new FutureTask(new Runnable() {
+    FutureTask<?> zmqTermination = new FutureTask<Void>(new Runnable() {
 
       @Override
       public void run() {
@@ -269,81 +225,6 @@ public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String,
     };
   }
 
-  /**
-   * Register the remote RPCs from the routing table into broker
-   */
-  private void registerRemoteRpcs(){
-    Optional<RoutingTable<String, String>> routingTableOptional = routingTableProvider.getRoutingTable();
-
-    Preconditions.checkState(routingTableOptional.isPresent(), "Routing table is absent");
-
-    Set<Map.Entry> remoteRoutes =
-            routingTableProvider.getRoutingTable().get().getAllRoutes();
-
-    //filter out all entries that contains local address
-    //we dont want to register local RPCs as remote
-    Predicate<Map.Entry> notLocalAddressFilter = new Predicate<Map.Entry>(){
-      public boolean apply(Map.Entry remoteRoute){
-        return !getServerAddress().equalsIgnoreCase((String)remoteRoute.getValue());
-      }
-    };
-
-    //filter the entries created by current node
-    Set<Map.Entry> filteredRemoteRoutes = Sets.filter(remoteRoutes, notLocalAddressFilter);
-
-    for (Map.Entry route : filteredRemoteRoutes){
-      onRouteUpdated((String) route.getKey(), "");//value is not needed by broker
-    }
-  }
-
-  /**
-   * Un-Register the local RPCs from the routing table
-   */
-  private void unregisterLocalRpcs(){
-    Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
-    for (QName rpc : currentlySupported) {
-      listener.onRpcImplementationRemoved(rpc);
-    }
-  }
-
-  /**
-   * Publish all the locally registered RPCs in the routing table
-   */
-  private void announceLocalRpcs(){
-    Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
-    for (QName rpc : currentlySupported) {
-      listener.onRpcImplementationAdded(rpc);
-    }
-  }
-
-  /**
-   * @param key
-   * @param value
-   */
-  @Override
-  public void onRouteUpdated(String key, String value) {
-    RouteIdentifierImpl rId = new RouteIdentifierImpl();
-    try {
-      _logger.debug("Updating key/value {}-{}", key, value);
-      brokerSession.addRpcImplementation(
-          (QName) rId.fromString(key).getType(), client);
-
-      //TODO: Check with Tony for routed rpc
-      //brokerSession.addRoutedRpcImplementation((QName) rId.fromString(key).getRoute(), client);
-    } catch (Exception e) {
-      _logger.info("Route update failed {}", e);
-    }
-  }
-
-  /**
-   * @param key
-   */
-  @Override
-  public void onRouteDeleted(String key) {
-    //TODO: Broker session needs to be updated to support this
-    throw new UnsupportedOperationException();
-  }
-
   /**
    * Finds IPv4 address of the local VM
    * TODO: This method is non-deterministic. There may be more than one IPv4 address. Cant say which
@@ -353,23 +234,22 @@ public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String,
    * @return
    */
   private String findIpAddress() {
-    Enumeration e = null;
+    Enumeration<?> e = null;
     try {
       e = NetworkInterface.getNetworkInterfaces();
     } catch (SocketException e1) {
       _logger.error("Failed to get list of interfaces", e1);
-      //throw new RuntimeException("Failed to acquire list of interfaces", e1);
       return null;
     }
     while (e.hasMoreElements()) {
 
       NetworkInterface n = (NetworkInterface) e.nextElement();
 
-      Enumeration ee = n.getInetAddresses();
+      Enumeration<?> ee = n.getInetAddresses();
       while (ee.hasMoreElements()) {
         InetAddress i = (InetAddress) ee.nextElement();
         _logger.debug("Trying address {}", i);
-        if ((i instanceof Inet4Address) && (i.isSiteLocalAddress())) {
+        if ((i instanceof Inet4Address) && (!i.isLoopbackAddress())) {
           String hostAddress = i.getHostAddress();
           _logger.debug("Settled on host address {}", hostAddress);
           return hostAddress;
@@ -381,74 +261,4 @@ public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String,
     return null;
   }
 
-  /**
-   * Listener for rpc registrations
-   */
-  private class RpcListener implements RpcRegistrationListener {
-
-    @Override
-    public void onRpcImplementationAdded(QName name) {
-
-      //if the service name exists in the set, this notice
-      //has bounced back from the broker. It should be ignored
-      if (remoteServices.contains(name))
-        return;
-
-      _logger.debug("Adding registration for [{}]", name);
-      RouteIdentifierImpl routeId = new RouteIdentifierImpl();
-      routeId.setType(name);
-
-      RoutingTable<String, String> routingTable = getRoutingTable();
-
-      try {
-        routingTable.addGlobalRoute(routeId.toString(), getServerAddress());
-        _logger.debug("Route added [{}-{}]", name, getServerAddress());
-
-      } catch (RoutingTableException | SystemException e) {
-        //TODO: This can be thrown when route already exists in the table. Broker
-        //needs to handle this.
-        _logger.error("Unhandled exception while adding global route to routing table [{}]", e);
-
-      }
-    }
-
-    @Override
-    public void onRpcImplementationRemoved(QName name) {
-
-      _logger.debug("Removing registration for [{}]", name);
-      RouteIdentifierImpl routeId = new RouteIdentifierImpl();
-      routeId.setType(name);
-
-      RoutingTable<String, String> routingTable = getRoutingTable();
-
-      try {
-        routingTable.removeGlobalRoute(routeId.toString());
-      } catch (RoutingTableException | SystemException e) {
-        _logger.error("Route delete failed {}", e);
-      }
-    }
-
-    private RoutingTable<String, String> getRoutingTable(){
-      Optional<RoutingTable<String, String>> routingTable =
-          routingTableProvider.getRoutingTable();
-
-      checkNotNull(routingTable.isPresent(), "Routing table is null");
-
-      return routingTable.get();
-    }
-  }
-
-  /*
-   * Listener for Route changes in broker. Broker notifies this listener in the event
-   * of any change (add/delete). Listener then updates the routing table.
-   */
-  private class BrokerRouteChangeListener
-      implements org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener<RpcRoutingContext, InstanceIdentifier>{
-
-    @Override
-    public void onRouteChange(RouteChange<RpcRoutingContext, InstanceIdentifier> routeChange) {
-
-    }
-  }
-
 }