Remove unneded RoutingTable time tracking
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
index e36060cc13ece309f04f10adebdb74a62f158146..c8415cc818733ff67bcc9828eaf96eceac916f7c 100644 (file)
  */
 package org.opendaylight.controller.remote.rpc.registry;
 
-import akka.actor.ActorSelection;
+import akka.actor.ActorRef;
 import akka.actor.Address;
 import akka.actor.Props;
-import akka.cluster.ClusterEvent;
-import akka.cluster.Member;
-import akka.japi.Creator;
-import org.opendaylight.controller.remote.rpc.AbstractUntypedActor;
-import org.opendaylight.controller.remote.rpc.ActorConstants;
-import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc;
-import org.opendaylight.controller.remote.rpc.messages.AddRpc;
-import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc;
-import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply;
-import org.opendaylight.controller.remote.rpc.messages.GetRpc;
-import org.opendaylight.controller.remote.rpc.messages.GetRpcReply;
-import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc;
-import org.opendaylight.controller.remote.rpc.messages.RemoveRpc;
-import org.opendaylight.controller.remote.rpc.messages.RoutingTableData;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.collection.JavaConversions;
-
-import java.util.LinkedHashSet;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.Random;
+import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
+import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 
 /**
- * This Actor maintains the routing table state and sync it with other nodes in the cluster.
+ * Registry to look up cluster nodes that have registered for a given RPC.
  *
- * A scheduler runs after an interval of time, which pick a random member from the cluster
- * and send the current state of routing table to the member.
- *
- * when a message of routing table data is received, it gets merged with the local routing table
- * to keep the latest data.
+ * <p>
+ * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
+ * cluster wide information.
  */
+public class RpcRegistry extends BucketStore<RoutingTable> {
+    private final ActorRef rpcRegistrar;
 
-public class RpcRegistry extends AbstractUntypedActor {
-
-  private static final Logger LOG = LoggerFactory.getLogger(RpcRegistry.class);
-  private RoutingTable<RpcRouter.RouteIdentifier<?, ?, ?>, String> routingTable;
-  private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
-  private final ClusterWrapper clusterWrapper;
-  private final ScheduledFuture<?> syncScheduler;
-
-  private RpcRegistry(ClusterWrapper clusterWrapper){
-    this.routingTable = new RoutingTable<>();
-    this.clusterWrapper = clusterWrapper;
-    this.syncScheduler = scheduler.scheduleAtFixedRate(new SendRoutingTable(), 10, 10, TimeUnit.SECONDS);
-  }
-
-  public static Props props(final ClusterWrapper clusterWrapper){
-    return Props.create(new Creator<RpcRegistry>(){
-
-      @Override
-      public RpcRegistry create() throws Exception {
-        return new RpcRegistry(clusterWrapper);
-      }
-    });
-  }
-
-  @Override
-  protected void handleReceive(Object message) throws Exception {
-    LOG.debug("Received message {}", message);
-    if(message instanceof RoutingTableData) {
-      syncRoutingTable((RoutingTableData) message);
-    } else if(message instanceof GetRoutedRpc) {
-      getRoutedRpc((GetRoutedRpc) message);
-    } else if(message instanceof GetRpc) {
-      getRpc((GetRpc) message);
-    } else if(message instanceof AddRpc) {
-      addRpc((AddRpc) message);
-    } else if(message instanceof RemoveRpc) {
-      removeRpc((RemoveRpc) message);
-    } else if(message instanceof AddRoutedRpc) {
-      addRoutedRpc((AddRoutedRpc) message);
-    } else if(message instanceof RemoveRoutedRpc) {
-      removeRoutedRpc((RemoveRoutedRpc) message);
+    public RpcRegistry(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker, final ActorRef rpcRegistrar) {
+        super(config, new RoutingTable(rpcInvoker));
+        this.rpcRegistrar = Preconditions.checkNotNull(rpcRegistrar);
     }
-  }
-
-  private void getRoutedRpc(GetRoutedRpc rpcMsg){
-    LOG.debug("Get latest routed Rpc location from routing table {}", rpcMsg);
-    String remoteActorPath = routingTable.getLastAddedRoutedRpc(rpcMsg.getRouteId());
-    GetRoutedRpcReply routedRpcReply = new GetRoutedRpcReply(remoteActorPath);
 
-    getSender().tell(routedRpcReply, self());
-  }
+    /**
+     * Create a new props instance for instantiating an RpcRegistry actor.
+     *
+     * @param config Provider configuration
+     * @param rpcRegistrar Local RPC provider interface, used to register routers to remote nodes
+     * @param rpcInvoker Actor handling RPC invocation requests from remote nodes
+     * @return A new {@link Props} instance
+     */
+    public static Props props(final RemoteRpcProviderConfig config, final ActorRef rpcInvoker,
+            final ActorRef rpcRegistrar) {
+        return Props.create(RpcRegistry.class, config, rpcInvoker, rpcRegistrar);
+    }
 
-  private void getRpc(GetRpc rpcMsg) {
-    LOG.debug("Get global Rpc location from routing table {}", rpcMsg);
-    String remoteActorPath = routingTable.getGlobalRoute(rpcMsg.getRouteId());
-    GetRpcReply rpcReply = new GetRpcReply(remoteActorPath);
+    @Override
+    protected void handleReceive(final Object message) throws Exception {
+        if (message instanceof AddOrUpdateRoutes) {
+            receiveAddRoutes((AddOrUpdateRoutes) message);
+        } else if (message instanceof RemoveRoutes) {
+            receiveRemoveRoutes((RemoveRoutes) message);
+        } else {
+            super.handleReceive(message);
+        }
+    }
 
-    getSender().tell(rpcReply, self());
-  }
+    private void receiveAddRoutes(final AddOrUpdateRoutes msg) {
+        LOG.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
+        updateLocalBucket(getLocalBucket().getData().addRpcs(msg.getRouteIdentifiers()));
+    }
 
-  private void addRpc(AddRpc rpcMsg) {
-    LOG.debug("Add Rpc to routing table {}", rpcMsg);
-    routingTable.addGlobalRoute(rpcMsg.getRouteId(), rpcMsg.getActorPath());
+    /**
+     * Processes a RemoveRoutes message.
+     *
+     * @param msg contains list of route ids to remove
+     */
+    private void receiveRemoveRoutes(final RemoveRoutes msg) {
+        LOG.debug("RemoveRoutes: {}", msg.getRouteIdentifiers());
+        updateLocalBucket(getLocalBucket().getData().removeRpcs(msg.getRouteIdentifiers()));
+    }
 
-    getSender().tell("Success", self());
-  }
+    @Override
+    protected void onBucketRemoved(final Address address, final Bucket<RoutingTable> bucket) {
+        rpcRegistrar.tell(new UpdateRemoteEndpoints(ImmutableMap.of(address, Optional.empty())), ActorRef.noSender());
+    }
 
-  private void removeRpc(RemoveRpc rpcMsg) {
-    LOG.debug("Removing Rpc to routing table {}", rpcMsg);
-    routingTable.removeGlobalRoute(rpcMsg.getRouteId());
+    @Override
+    protected void onBucketsUpdated(final Map<Address, Bucket<RoutingTable>> buckets) {
+        final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = new HashMap<>(buckets.size());
+
+        for (Entry<Address, Bucket<RoutingTable>> e : buckets.entrySet()) {
+            final RoutingTable table = e.getValue().getData();
+
+            final List<DOMRpcIdentifier> rpcs = new ArrayList<>(table.getRoutes().size());
+            for (RouteIdentifier<?, ?, ?> ri : table.getRoutes()) {
+                if (ri instanceof RouteIdentifierImpl) {
+                    final RouteIdentifierImpl id = (RouteIdentifierImpl) ri;
+                    rpcs.add(DOMRpcIdentifier.create(SchemaPath.create(true, id.getType()), id.getRoute()));
+                } else {
+                    LOG.warn("Skipping unsupported route {} from {}", ri, e.getKey());
+                }
+            }
+
+            endpoints.put(e.getKey(), rpcs.isEmpty() ? Optional.empty()
+                    : Optional.of(new RemoteRpcEndpoint(table.getRpcInvoker(), rpcs)));
+        }
 
-    getSender().tell("Success", self());
-  }
+        if (!endpoints.isEmpty()) {
+            rpcRegistrar.tell(new UpdateRemoteEndpoints(endpoints), ActorRef.noSender());
+        }
+    }
 
-  private void addRoutedRpc(AddRoutedRpc rpcMsg) {
-    routingTable.addRoutedRpcs(rpcMsg.getAnnouncements(), rpcMsg.getActorPath());
-    getSender().tell("Success", self());
-  }
+    public static final class RemoteRpcEndpoint {
+        private final Set<DOMRpcIdentifier> rpcs;
+        private final ActorRef router;
 
-  private void removeRoutedRpc(RemoveRoutedRpc rpcMsg) {
-    routingTable.removeRoutes(rpcMsg.getAnnouncements(), rpcMsg.getActorPath());
-    getSender().tell("Success", self());
-  }
+        RemoteRpcEndpoint(final ActorRef router, final Collection<DOMRpcIdentifier> rpcs) {
+            this.router = Preconditions.checkNotNull(router);
+            this.rpcs = ImmutableSet.copyOf(rpcs);
+        }
 
-  private void syncRoutingTable(RoutingTableData routingTableData) {
-    LOG.debug("Syncing routing table {}", routingTableData);
+        public ActorRef getRouter() {
+            return router;
+        }
 
-    Map<RpcRouter.RouteIdentifier<?, ?, ?>, String> newRpcMap = routingTableData.getRpcMap();
-    Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = newRpcMap.keySet();
-    for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
-      routingTable.addGlobalRoute(routeId, newRpcMap.get(routeId));
+        public Set<DOMRpcIdentifier> getRpcs() {
+            return rpcs;
+        }
     }
 
-    Map<RpcRouter.RouteIdentifier<?, ?, ?>, LinkedHashSet<String>> newRoutedRpcMap =
-        routingTableData.getRoutedRpcMap();
-    routeIds = newRoutedRpcMap.keySet();
+    /**
+     * All messages used by the RpcRegistry.
+     */
+    public static class Messages {
+        abstract static class AbstractRouteMessage {
+            final List<RouteIdentifier<?, ?, ?>> routeIdentifiers;
+
+            AbstractRouteMessage(final List<RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+                Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(),
+                        "Route Identifiers must be supplied");
+                this.routeIdentifiers = routeIdentifiers;
+            }
+
+            List<RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
+                return this.routeIdentifiers;
+            }
+
+            @Override
+            public String toString() {
+                return "ContainsRoute{" + "routeIdentifiers=" + routeIdentifiers + '}';
+            }
+        }
 
-    for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
-      Set<String> routeAddresses = newRoutedRpcMap.get(routeId);
-      for(String routeAddress : routeAddresses) {
-        routingTable.addRoutedRpc(routeId, routeAddress);
-      }
-    }
-  }
-
-  private ActorSelection getRandomRegistryActor() {
-    ClusterEvent.CurrentClusterState clusterState = clusterWrapper.getState();
-    ActorSelection actor = null;
-    Set<Member> members = JavaConversions.asJavaSet(clusterState.members());
-    int memberSize = members.size();
-    // Don't select yourself
-    if(memberSize > 1) {
-      Address currentNodeAddress = clusterWrapper.getAddress();
-      int index = new Random().nextInt(memberSize);
-      int i = 0;
-      // keeping previous member, in case when random index member is same as current actor
-      // and current actor member is last in set
-      Member previousMember = null;
-      for(Member member : members){
-        if(i == index-1) {
-          previousMember = member;
+        public static final class AddOrUpdateRoutes extends AbstractRouteMessage {
+            public AddOrUpdateRoutes(final List<RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+                super(routeIdentifiers);
+            }
         }
-        if(i == index) {
-          if(!currentNodeAddress.equals(member.address())) {
-            actor = this.context().actorSelection(member.address() + ActorConstants.RPC_REGISTRY_PATH);
-            break;
-          } else if(index < memberSize-1){ // pick the next element in the set
-            index++;
-          }
+
+        public static final class RemoveRoutes extends AbstractRouteMessage {
+            public RemoveRoutes(final List<RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+                super(routeIdentifiers);
+            }
         }
-        i++;
-      }
-      if(actor == null && previousMember != null) {
-        actor = this.context().actorSelection(previousMember.address() + ActorConstants.RPC_REGISTRY_PATH);
-      }
-    }
-    return actor;
-  }
 
-  private class SendRoutingTable implements Runnable {
+        public static final class UpdateRemoteEndpoints {
+            private final Map<Address, Optional<RemoteRpcEndpoint>> endpoints;
 
-    @Override
-    public void run() {
-      RoutingTableData routingTableData =
-          new RoutingTableData(routingTable.getGlobalRpcMap(), routingTable.getRoutedRpcMap());
-      LOG.debug("Sending routing table for sync {}", routingTableData);
-      ActorSelection actor = getRandomRegistryActor();
-      if(actor != null) {
-        actor.tell(routingTableData, self());
-      }
+            UpdateRemoteEndpoints(final Map<Address, Optional<RemoteRpcEndpoint>> endpoints) {
+                this.endpoints = ImmutableMap.copyOf(endpoints);
+            }
+
+            public Map<Address, Optional<RemoteRpcEndpoint>> getEndpoints() {
+                return endpoints;
+            }
+        }
     }
-  }
 }