BUG-6937: Add ReachableMember case to Gossiper
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
index 095d70926b90d3838a777cc14a6976b31ccd9c97..fc5b618663988d304d6b51468d2da9f99250d7b2 100644 (file)
 package org.opendaylight.controller.remote.rpc.registry;
 
 import akka.actor.ActorRef;
-import akka.actor.Address;
+import akka.actor.Cancellable;
 import akka.actor.Props;
-import akka.dispatch.Mapper;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
 import akka.japi.Option;
 import akka.japi.Pair;
-import akka.pattern.Patterns;
 import com.google.common.base.Preconditions;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import scala.concurrent.Future;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
+import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * Registry to look up cluster nodes that have registered for a given rpc.
- * <p/>
+ *
+ * <p>
  * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
  * cluster wide information.
  */
-public class RpcRegistry extends AbstractUntypedActorWithMetering {
-
-    final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
-
-    /**
-     * Store to keep the registry. Bucket store sync's it across nodes in the cluster
-     */
-    private ActorRef bucketStore;
-
-    /**
-     * Rpc broker that would use the registry to route requests.
-     */
-    private ActorRef localRouter;
-
-    private RemoteRpcProviderConfig config;
-
-    public RpcRegistry() {
-        bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store");
-        this.config = new RemoteRpcProviderConfig(getContext().system().settings().config());
-        log.info("Bucket store path = {}", bucketStore.path().toString());
+public class RpcRegistry extends BucketStore<RoutingTable> {
+    private final Set<Runnable> routesUpdatedCallbacks = new HashSet<>();
+    private final FiniteDuration findRouterTimeout;
+
+    public RpcRegistry(RemoteRpcProviderConfig config) {
+        super(config);
+        getLocalBucket().setData(new RoutingTable());
+        findRouterTimeout = getConfig().getGossipTickInterval().$times(10);
     }
 
-    public RpcRegistry(ActorRef bucketStore) {
-        this.bucketStore = bucketStore;
+    public static Props props(RemoteRpcProviderConfig config) {
+        return Props.create(RpcRegistry.class, config);
     }
 
-
     @Override
     protected void handleReceive(Object message) throws Exception {
         //TODO: if sender is remote, reject message
 
-        if (message instanceof SetLocalRouter)
+        if (message instanceof SetLocalRouter) {
             receiveSetLocalRouter((SetLocalRouter) message);
-
-        if (message instanceof AddOrUpdateRoutes)
+        } else if (message instanceof AddOrUpdateRoutes) {
             receiveAddRoutes((AddOrUpdateRoutes) message);
-
-        else if (message instanceof RemoveRoutes)
+        } else if (message instanceof RemoveRoutes) {
             receiveRemoveRoutes((RemoveRoutes) message);
-
-        else if (message instanceof Messages.FindRouters)
+        } else if (message instanceof Messages.FindRouters) {
             receiveGetRouter((FindRouters) message);
-
-        else
-            unhandled(message);
+        } else if (message instanceof Runnable) {
+            ((Runnable)message).run();
+        } else {
+            super.handleReceive(message);
+        }
     }
 
     /**
-     * Register's rpc broker
+     * Registers a rpc broker.
      *
      * @param message contains {@link akka.actor.ActorRef} for rpc broker
      */
     private void receiveSetLocalRouter(SetLocalRouter message) {
-        localRouter = message.getRouter();
+        getLocalBucket().getData().setRouter(message.getRouter());
     }
 
-    /**
-     * @param msg
-     */
     private void receiveAddRoutes(AddOrUpdateRoutes msg) {
 
-        Preconditions.checkState(localRouter != null, "Router must be set first");
-
-        Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration());
-        futureReply.map(getMapperToAddRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
-    }
+        log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
 
-    /**
-     * @param msg contains list of route ids to remove
-     */
-    private void receiveRemoveRoutes(RemoveRoutes msg) {
+        RoutingTable table = getLocalBucket().getData().copy();
+        for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
+            table.addRoute(routeId);
+        }
 
-        Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration());
-        futureReply.map(getMapperToRemoveRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
+        updateLocalBucket(table);
 
+        onBucketsUpdated();
     }
 
     /**
-     * Finds routers for the given rpc.
+     * Processes a RemoveRoutes message.
      *
-     * @param msg
+     * @param msg contains list of route ids to remove
      */
-    private void receiveGetRouter(FindRouters msg) {
-        final ActorRef sender = getSender();
+    private void receiveRemoveRoutes(RemoveRoutes msg) {
 
-        Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), config.getAskDuration());
-        futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher());
-    }
+        RoutingTable table = getLocalBucket().getData().copy();
+        for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
+            table.removeRoute(routeId);
+        }
 
-    /**
-     * Helper to create empty reply when no routers are found
-     *
-     * @return
-     */
-    private Messages.FindRoutersReply createEmptyReply() {
-        List<Pair<ActorRef, Long>> routerWithUpdateTime = Collections.emptyList();
-        return new Messages.FindRoutersReply(routerWithUpdateTime);
+        updateLocalBucket(table);
     }
 
     /**
-     * Helper to create a reply when routers are found for the given rpc
+     * Finds routers for the given rpc.
      *
-     * @param buckets
-     * @param routeId
-     * @return
+     * @param findRouters the FindRouters request
      */
-    private Messages.FindRoutersReply createReplyWithRouters(
-            Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
+    private void receiveGetRouter(final FindRouters findRouters) {
+        log.debug("receiveGetRouter for {}", findRouters.getRouteIdentifier());
 
-        List<Pair<ActorRef, Long>> routers = new ArrayList<>();
-        Option<Pair<ActorRef, Long>> routerWithUpdateTime = null;
+        final ActorRef sender = getSender();
+        if (!findRouters(findRouters, sender)) {
+            log.debug("No routers found for {} - scheduling {} ms timer", findRouters.getRouteIdentifier(),
+                    findRouterTimeout.toMillis());
+
+            final AtomicReference<Cancellable> timer = new AtomicReference<>();
+            final Runnable routesUpdatedRunnable = new Runnable() {
+                @Override
+                public void run() {
+                    if (findRouters(findRouters, sender)) {
+                        routesUpdatedCallbacks.remove(this);
+                        timer.get().cancel();
+                    }
+                }
+            };
 
-        for (Bucket bucket : buckets.values()) {
+            routesUpdatedCallbacks.add(routesUpdatedRunnable);
 
-            RoutingTable table = (RoutingTable) bucket.getData();
-            if (table == null)
-                continue;
+            Runnable timerRunnable = () -> {
+                log.warn("Timed out finding routers for {}", findRouters.getRouteIdentifier());
 
-            routerWithUpdateTime = table.getRouterFor(routeId);
-            if (routerWithUpdateTime.isEmpty())
-                continue;
+                routesUpdatedCallbacks.remove(routesUpdatedRunnable);
+                sender.tell(new Messages.FindRoutersReply(
+                        Collections.<Pair<ActorRef, Long>>emptyList()), self());
+            };
 
-            routers.add(routerWithUpdateTime.get());
+            timer.set(getContext().system().scheduler().scheduleOnce(findRouterTimeout, self(), timerRunnable,
+                    getContext().dispatcher(), self()));
         }
-
-        return new Messages.FindRoutersReply(routers);
     }
 
+    private boolean findRouters(FindRouters findRouters, ActorRef sender) {
+        List<Pair<ActorRef, Long>> routers = new ArrayList<>();
 
-    ///
-    ///private factories to create Mapper
-    ///
-
-    /**
-     * Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found
-     *
-     * @param routeId the rpc
-     * @param sender  client who asked to find the routers.
-     * @return
-     */
-    private Mapper<Object, Void> getMapperToGetRouter(
-            final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
-        return new Mapper<Object, Void>() {
-            @Override
-            public Void apply(Object replyMessage) {
+        RouteIdentifier<?, ?, ?> routeId = findRouters.getRouteIdentifier();
+        findRoutes(getLocalBucket().getData(), routeId, routers);
 
-                if (replyMessage instanceof GetAllBucketsReply) {
+        for (Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
+            findRoutes(bucket.getData(), routeId, routers);
+        }
 
-                    GetAllBucketsReply reply = (GetAllBucketsReply) replyMessage;
-                    Map<Address, Bucket> buckets = reply.getBuckets();
+        log.debug("Found {} routers for {}", routers.size(), findRouters.getRouteIdentifier());
 
-                    if (buckets == null || buckets.isEmpty()) {
-                        sender.tell(createEmptyReply(), getSelf());
-                        return null;
-                    }
+        boolean foundRouters = !routers.isEmpty();
+        if (foundRouters) {
+            sender.tell(new Messages.FindRoutersReply(routers), getSelf());
+        }
 
-                    sender.tell(createReplyWithRouters(buckets, routeId), getSelf());
-                }
-                return null;
-            }
-        };
+        return foundRouters;
     }
 
-    /**
-     * Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently,
-     * it updates the local bucket in bucket store.
-     *
-     * @param routeIds rpc to remote
-     * @return
-     */
-    private Mapper<Object, Void> getMapperToRemoveRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
-        return new Mapper<Object, Void>() {
-            @Override
-            public Void apply(Object replyMessage) {
-                if (replyMessage instanceof GetLocalBucketReply) {
-
-                    GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
-                    Bucket<RoutingTable> bucket = reply.getBucket();
-
-                    if (bucket == null) {
-                        log.debug("Local bucket is null");
-                        return null;
-                    }
-
-                    RoutingTable table = bucket.getData();
-                    if (table == null)
-                        table = new RoutingTable();
-
-                    table.setRouter(localRouter);
-
-                    if (!table.isEmpty()) {
-                        for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
-                            table.removeRoute(routeId);
-                        }
-                    }
-                    bucket.setData(table);
+    private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier<?, ?, ?> routeId,
+            List<Pair<ActorRef, Long>> routers) {
+        if (table == null) {
+            return;
+        }
 
-                    UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
-                    bucketStore.tell(updateBucketMessage, getSelf());
-                }
-                return null;
-            }
-        };
+        Option<Pair<ActorRef, Long>> routerWithUpdateTime = table.getRouterFor(routeId);
+        if (!routerWithUpdateTime.isEmpty()) {
+            routers.add(routerWithUpdateTime.get());
+        }
     }
 
-    /**
-     * Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently,
-     * it updates the local bucket in bucket store.
-     *
-     * @param routeIds rpc to add
-     * @return
-     */
-    private Mapper<Object, Void> getMapperToAddRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
-
-        return new Mapper<Object, Void>() {
-            @Override
-            public Void apply(Object replyMessage) {
-                if (replyMessage instanceof GetLocalBucketReply) {
-
-                    GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
-                    Bucket<RoutingTable> bucket = reply.getBucket();
-
-                    if (bucket == null) {
-                        log.debug("Local bucket is null");
-                        return null;
-                    }
-
-                    RoutingTable table = bucket.getData();
-                    if (table == null)
-                        table = new RoutingTable();
-
-                    table.setRouter(localRouter);
-                    for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
-                        table.addRoute(routeId);
-                    }
-
-                    bucket.setData(table);
-
-                    UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
-                    bucketStore.tell(updateBucketMessage, getSelf());
-                }
+    @Override
+    protected void onBucketsUpdated() {
+        if (routesUpdatedCallbacks.isEmpty()) {
+            return;
+        }
 
-                return null;
-            }
-        };
+        for (Runnable callBack: routesUpdatedCallbacks.toArray(new Runnable[routesUpdatedCallbacks.size()])) {
+            callBack.run();
+        }
     }
 
     /**
-     * All messages used by the RpcRegistry
+     * All messages used by the RpcRegistry.
      */
     public static class Messages {
 
@@ -302,9 +200,8 @@ public class RpcRegistry extends AbstractUntypedActorWithMetering {
             final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
 
             public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
-                Preconditions.checkArgument(routeIdentifiers != null &&
-                                            !routeIdentifiers.isEmpty(),
-                                            "Route Identifiers must be supplied");
+                Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(),
+                        "Route Identifiers must be supplied");
                 this.routeIdentifiers = routeIdentifiers;
             }
 
@@ -314,9 +211,7 @@ public class RpcRegistry extends AbstractUntypedActorWithMetering {
 
             @Override
             public String toString() {
-                return "ContainsRoute{" +
-                        "routeIdentifiers=" + routeIdentifiers +
-                        '}';
+                return "ContainsRoute{" + "routeIdentifiers=" + routeIdentifiers + '}';
             }
         }
 
@@ -348,9 +243,7 @@ public class RpcRegistry extends AbstractUntypedActorWithMetering {
 
             @Override
             public String toString() {
-                return "SetLocalRouter{" +
-                        "router=" + router +
-                        '}';
+                return "SetLocalRouter{" + "router=" + router + '}';
             }
         }
 
@@ -368,9 +261,7 @@ public class RpcRegistry extends AbstractUntypedActorWithMetering {
 
             @Override
             public String toString() {
-                return "FindRouters{" +
-                        "routeIdentifier=" + routeIdentifier +
-                        '}';
+                return "FindRouters{" + "routeIdentifier=" + routeIdentifier + '}';
             }
         }
 
@@ -388,9 +279,7 @@ public class RpcRegistry extends AbstractUntypedActorWithMetering {
 
             @Override
             public String toString() {
-                return "FindRoutersReply{" +
-                        "routerWithUpdateTime=" + routerWithUpdateTime +
-                        '}';
+                return "FindRoutersReply{" + "routerWithUpdateTime=" + routerWithUpdateTime + '}';
             }
         }
     }