BUG-6937: Add ReachableMember case to Gossiper
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
index 51609870cc4aad1c8789dfdcd0b68f04563b5cdf..fc5b618663988d304d6b51468d2da9f99250d7b2 100644 (file)
 package org.opendaylight.controller.remote.rpc.registry;
 
 import akka.actor.ActorRef;
-import akka.actor.Address;
+import akka.actor.Cancellable;
 import akka.actor.Props;
-import akka.actor.UntypedActor;
-import akka.dispatch.Mapper;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
 import akka.japi.Option;
 import akka.japi.Pair;
-import akka.pattern.Patterns;
 import com.google.common.base.Preconditions;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
-import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import scala.concurrent.Future;
-
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoute;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoute;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * Registry to look up cluster nodes that have registered for a given rpc.
+ *
  * <p>
  * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
  * cluster wide information.
- *
  */
-public class RpcRegistry extends UntypedActor {
-
-    final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
-
-    /**
-     * Store to keep the registry. Bucket store sync's it across nodes in the cluster
-     */
-    private ActorRef bucketStore;
-
-    /**
-     * Rpc broker that would use the registry to route requests.
-     */
-    private ActorRef localRouter;
-
-    public RpcRegistry() {
-        bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store");
+public class RpcRegistry extends BucketStore<RoutingTable> {
+    private final Set<Runnable> routesUpdatedCallbacks = new HashSet<>();
+    private final FiniteDuration findRouterTimeout;
+
+    public RpcRegistry(RemoteRpcProviderConfig config) {
+        super(config);
+        getLocalBucket().setData(new RoutingTable());
+        findRouterTimeout = getConfig().getGossipTickInterval().$times(10);
     }
 
-    public RpcRegistry(ActorRef bucketStore) {
-        this.bucketStore = bucketStore;
+    public static Props props(RemoteRpcProviderConfig config) {
+        return Props.create(RpcRegistry.class, config);
     }
 
     @Override
-    public void onReceive(Object message) throws Exception {
-
-        log.debug("Received message: message [{}]", message);
-
+    protected void handleReceive(Object message) throws Exception {
         //TODO: if sender is remote, reject message
 
-        if (message instanceof SetLocalRouter)
+        if (message instanceof SetLocalRouter) {
             receiveSetLocalRouter((SetLocalRouter) message);
-
-        if (message instanceof AddOrUpdateRoute)
-            receiveAddRoute((AddOrUpdateRoute) message);
-
-        else if (message instanceof RemoveRoute)
-            receiveRemoveRoute((RemoveRoute) message);
-
-        else if (message instanceof Messages.FindRouters)
-            receiveGetRouter((Messages.FindRouters) message);
-
-        else
-            unhandled(message);
+        } else if (message instanceof AddOrUpdateRoutes) {
+            receiveAddRoutes((AddOrUpdateRoutes) message);
+        } else if (message instanceof RemoveRoutes) {
+            receiveRemoveRoutes((RemoveRoutes) message);
+        } else if (message instanceof Messages.FindRouters) {
+            receiveGetRouter((FindRouters) message);
+        } else if (message instanceof Runnable) {
+            ((Runnable)message).run();
+        } else {
+            super.handleReceive(message);
+        }
     }
 
     /**
-     * Register's rpc broker
+     * Registers a rpc broker.
      *
      * @param message contains {@link akka.actor.ActorRef} for rpc broker
      */
     private void receiveSetLocalRouter(SetLocalRouter message) {
-        if (message == null || message.getRouter() == null)
-            return;//ignore
-
-        localRouter = message.getRouter();
+        getLocalBucket().getData().setRouter(message.getRouter());
     }
 
-    /**
-     * //TODO: update this to accept multiple route registration
-     * @param msg
-     */
-    private void receiveAddRoute(AddOrUpdateRoute msg) {
-        if (msg.getRouteIdentifier() == null)
-            return;//ignore
-
-        Preconditions.checkState(localRouter != null, "Router must be set first");
-
-        Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000);
-        futureReply.map(getMapperToAddRoute(msg.getRouteIdentifier()), getContext().dispatcher());
-    }
-
-    /**
-     * //TODO: update this to accept multiple routes
-     * @param msg
-     */
-    private void receiveRemoveRoute(RemoveRoute msg) {
-        if (msg.getRouteIdentifier() == null)
-            return;//ignore
+    private void receiveAddRoutes(AddOrUpdateRoutes msg) {
 
-        Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000);
-        futureReply.map(getMapperToRemoveRoute(msg.getRouteIdentifier()), getContext().dispatcher());
-
-    }
-
-    /**
-     * Finds routers for the given rpc.
-     * @param msg
-     */
-    private void receiveGetRouter(Messages.FindRouters msg) {
-        final ActorRef sender = getSender();
+        log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
 
-        //if empty message, return empty list
-        if (msg.getRouteIdentifier() == null) {
-            sender.tell(createEmptyReply(), getSelf());
-            return;
+        RoutingTable table = getLocalBucket().getData().copy();
+        for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
+            table.addRoute(routeId);
         }
 
-        Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), 1000);
-        futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher());
+        updateLocalBucket(table);
 
+        onBucketsUpdated();
     }
 
     /**
-     * Helper to create empty reply when no routers are found
+     * Processes a RemoveRoutes message.
      *
-     * @return
+     * @param msg contains list of route ids to remove
      */
-    private Messages.FindRoutersReply createEmptyReply() {
-        List<Pair<ActorRef, Long>> routerWithUpdateTime = Collections.emptyList();
-        return new Messages.FindRoutersReply(routerWithUpdateTime);
-    }
-
-    /**
-     * Helper to create a reply when routers are found for the given rpc
-     * @param buckets
-     * @param routeId
-     * @return
-     */
-    private Messages.FindRoutersReply createReplyWithRouters(Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
-
-        List<Pair<ActorRef, Long>> routers = new ArrayList<>();
-
-        Option<Pair<ActorRef, Long>> routerWithUpdateTime = null;
-
-        for (Bucket bucket : buckets.values()) {
-
-            RoutingTable table = (RoutingTable) bucket.getData();
-
-            if (table == null)
-                continue;
-
-            routerWithUpdateTime = table.getRouterFor(routeId);
+    private void receiveRemoveRoutes(RemoveRoutes msg) {
 
-            if (routerWithUpdateTime.isEmpty())
-                continue;
-
-            routers.add(routerWithUpdateTime.get());
+        RoutingTable table = getLocalBucket().getData().copy();
+        for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
+            table.removeRoute(routeId);
         }
 
-        return new Messages.FindRoutersReply(routers);
+        updateLocalBucket(table);
     }
 
-
-    ///
-    ///private factories to create Mapper
-    ///
-
     /**
-     *  Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found
+     * Finds routers for the given rpc.
      *
-     * @param routeId the rpc
-     * @param sender  client who asked to find the routers.
-     * @return
+     * @param findRouters the FindRouters request
      */
-    private Mapper<Object, Void> getMapperToGetRouter(final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
-        return new Mapper<Object, Void>() {
-            @Override
-            public Void apply(Object replyMessage) {
-
-                if (replyMessage instanceof GetAllBucketsReply) {
+    private void receiveGetRouter(final FindRouters findRouters) {
+        log.debug("receiveGetRouter for {}", findRouters.getRouteIdentifier());
 
-                    GetAllBucketsReply reply = (GetAllBucketsReply) replyMessage;
-                    Map<Address, Bucket> buckets = reply.getBuckets();
-
-                    if (buckets == null || buckets.isEmpty()) {
-                        sender.tell(createEmptyReply(), getSelf());
-                        return null;
+        final ActorRef sender = getSender();
+        if (!findRouters(findRouters, sender)) {
+            log.debug("No routers found for {} - scheduling {} ms timer", findRouters.getRouteIdentifier(),
+                    findRouterTimeout.toMillis());
+
+            final AtomicReference<Cancellable> timer = new AtomicReference<>();
+            final Runnable routesUpdatedRunnable = new Runnable() {
+                @Override
+                public void run() {
+                    if (findRouters(findRouters, sender)) {
+                        routesUpdatedCallbacks.remove(this);
+                        timer.get().cancel();
                     }
-
-                    sender.tell(createReplyWithRouters(buckets, routeId), getSelf());
                 }
-                return null;
-            }
-        };
-    }
+            };
 
-    /**
-     * Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently,
-     * it updates the local bucket in bucket store.
-     *
-     * @param routeId rpc to remote
-     * @return
-     */
-    private Mapper<Object, Void> getMapperToRemoveRoute(final RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
-        return new Mapper<Object, Void>() {
-            @Override
-            public Void apply(Object replyMessage) {
-                if (replyMessage instanceof GetLocalBucketReply) {
+            routesUpdatedCallbacks.add(routesUpdatedRunnable);
 
-                    GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
-                    Bucket<RoutingTable> bucket = reply.getBucket();
-
-                    if (bucket == null) {
-                        log.debug("Local bucket is null");
-                        return null;
-                    }
+            Runnable timerRunnable = () -> {
+                log.warn("Timed out finding routers for {}", findRouters.getRouteIdentifier());
 
-                    RoutingTable table = bucket.getData();
-                    if (table == null)
-                        table = new RoutingTable();
+                routesUpdatedCallbacks.remove(routesUpdatedRunnable);
+                sender.tell(new Messages.FindRoutersReply(
+                        Collections.<Pair<ActorRef, Long>>emptyList()), self());
+            };
 
-                    table.setRouter(localRouter);
-                    table.removeRoute(routeId);
-
-                    bucket.setData(table);
-
-                    UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
-                    bucketStore.tell(updateBucketMessage, getSelf());
-                }
-                return null;
-            }
-        };
+            timer.set(getContext().system().scheduler().scheduleOnce(findRouterTimeout, self(), timerRunnable,
+                    getContext().dispatcher(), self()));
+        }
     }
 
-    /**
-     * Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently,
-     * it updates the local bucket in bucket store.
-     *
-     * @param routeId rpc to add
-     * @return
-     */
-    private Mapper<Object, Void> getMapperToAddRoute(final RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
+    private boolean findRouters(FindRouters findRouters, ActorRef sender) {
+        List<Pair<ActorRef, Long>> routers = new ArrayList<>();
 
-        return new Mapper<Object, Void>() {
-            @Override
-            public Void apply(Object replyMessage) {
-                if (replyMessage instanceof GetLocalBucketReply) {
+        RouteIdentifier<?, ?, ?> routeId = findRouters.getRouteIdentifier();
+        findRoutes(getLocalBucket().getData(), routeId, routers);
 
-                    GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
-                    Bucket<RoutingTable> bucket = reply.getBucket();
+        for (Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
+            findRoutes(bucket.getData(), routeId, routers);
+        }
 
-                    if (bucket == null) {
-                        log.debug("Local bucket is null");
-                        return null;
-                    }
+        log.debug("Found {} routers for {}", routers.size(), findRouters.getRouteIdentifier());
 
-                    RoutingTable table = bucket.getData();
-                    if (table == null)
-                        table = new RoutingTable();
+        boolean foundRouters = !routers.isEmpty();
+        if (foundRouters) {
+            sender.tell(new Messages.FindRoutersReply(routers), getSelf());
+        }
 
-                    table.setRouter(localRouter);
-                    table.addRoute(routeId);
+        return foundRouters;
+    }
 
-                    bucket.setData(table);
+    private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier<?, ?, ?> routeId,
+            List<Pair<ActorRef, Long>> routers) {
+        if (table == null) {
+            return;
+        }
 
-                    UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
-                    bucketStore.tell(updateBucketMessage, getSelf());
-                }
+        Option<Pair<ActorRef, Long>> routerWithUpdateTime = table.getRouterFor(routeId);
+        if (!routerWithUpdateTime.isEmpty()) {
+            routers.add(routerWithUpdateTime.get());
+        }
+    }
 
-                return null;
-            }
-        };
+    @Override
+    protected void onBucketsUpdated() {
+        if (routesUpdatedCallbacks.isEmpty()) {
+            return;
+        }
+
+        for (Runnable callBack: routesUpdatedCallbacks.toArray(new Runnable[routesUpdatedCallbacks.size()])) {
+            callBack.run();
+        }
     }
 
     /**
-     * All messages used by the RpcRegistry
+     * All messages used by the RpcRegistry.
      */
     public static class Messages {
 
 
         public static class ContainsRoute {
-            final RpcRouter.RouteIdentifier<?,?,?> routeIdentifier;
+            final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
 
-            public ContainsRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
-                Preconditions.checkArgument(routeIdentifier != null);
-                this.routeIdentifier = routeIdentifier;
+            public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+                Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(),
+                        "Route Identifiers must be supplied");
+                this.routeIdentifiers = routeIdentifiers;
             }
 
-            public RpcRouter.RouteIdentifier<?,?,?> getRouteIdentifier(){
-                return this.routeIdentifier;
+            public List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
+                return this.routeIdentifiers;
             }
 
             @Override
             public String toString() {
-                return this.getClass().getSimpleName() + "{" +
-                        "routeIdentifier=" + routeIdentifier +
-                        '}';
+                return "ContainsRoute{" + "routeIdentifiers=" + routeIdentifiers + '}';
             }
         }
 
-        public static class AddOrUpdateRoute extends ContainsRoute{
+        public static class AddOrUpdateRoutes extends ContainsRoute {
 
-            public AddOrUpdateRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
-                super(routeIdentifier);
+            public AddOrUpdateRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+                super(routeIdentifiers);
             }
         }
 
-        public static class RemoveRoute extends ContainsRoute {
+        public static class RemoveRoutes extends ContainsRoute {
 
-            public RemoveRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
-                super(routeIdentifier);
+            public RemoveRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+                super(routeIdentifiers);
             }
         }
 
-        public static class SetLocalRouter{
+        public static class SetLocalRouter {
             private final ActorRef router;
 
             public SetLocalRouter(ActorRef router) {
+                Preconditions.checkArgument(router != null, "Router must not be null");
                 this.router = router;
             }
 
-            public ActorRef getRouter(){
+            public ActorRef getRouter() {
                 return this.router;
             }
 
             @Override
             public String toString() {
-                return "SetLocalRouter{" +
-                        "router=" + router +
-                        '}';
+                return "SetLocalRouter{" + "router=" + router + '}';
             }
         }
 
-        public static class FindRouters extends ContainsRoute {
+        public static class FindRouters {
+            private final RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier;
+
             public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
-                super(routeIdentifier);
+                Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
+                this.routeIdentifier = routeIdentifier;
+            }
+
+            public RpcRouter.RouteIdentifier<?, ?, ?> getRouteIdentifier() {
+                return routeIdentifier;
+            }
+
+            @Override
+            public String toString() {
+                return "FindRouters{" + "routeIdentifier=" + routeIdentifier + '}';
             }
         }
 
@@ -367,18 +269,17 @@ public class RpcRegistry extends UntypedActor {
             final List<Pair<ActorRef, Long>> routerWithUpdateTime;
 
             public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
+                Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
                 this.routerWithUpdateTime = routerWithUpdateTime;
             }
 
-            public List<Pair<ActorRef, Long>> getRouterWithUpdateTime(){
+            public List<Pair<ActorRef, Long>> getRouterWithUpdateTime() {
                 return routerWithUpdateTime;
             }
 
             @Override
             public String toString() {
-                return "FindRoutersReply{" +
-                        "routerWithUpdateTime=" + routerWithUpdateTime +
-                        '}';
+                return "FindRoutersReply{" + "routerWithUpdateTime=" + routerWithUpdateTime + '}';
             }
         }
     }