Bug 2526: Race condition may cause missing routes in RPC BucketStore
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
index 095d70926b90d3838a777cc14a6976b31ccd9c97..845c1c819a70ca6bac0fb1a717b31f7861b6a6b6 100644 (file)
@@ -8,36 +8,21 @@
 package org.opendaylight.controller.remote.rpc.registry;
 
 import akka.actor.ActorRef;
-import akka.actor.Address;
-import akka.actor.Props;
-import akka.dispatch.Mapper;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import akka.japi.Option;
 import akka.japi.Pair;
-import akka.pattern.Patterns;
 import com.google.common.base.Preconditions;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import java.util.ArrayList;
+import java.util.List;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import scala.concurrent.Future;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
+import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
 
 /**
  * Registry to look up cluster nodes that have registered for a given rpc.
@@ -45,51 +30,29 @@ import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.Bu
  * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
  * cluster wide information.
  */
-public class RpcRegistry extends AbstractUntypedActorWithMetering {
+public class RpcRegistry extends BucketStore<RoutingTable> {
 
     final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
 
-    /**
-     * Store to keep the registry. Bucket store sync's it across nodes in the cluster
-     */
-    private ActorRef bucketStore;
-
-    /**
-     * Rpc broker that would use the registry to route requests.
-     */
-    private ActorRef localRouter;
-
-    private RemoteRpcProviderConfig config;
-
     public RpcRegistry() {
-        bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store");
-        this.config = new RemoteRpcProviderConfig(getContext().system().settings().config());
-        log.info("Bucket store path = {}", bucketStore.path().toString());
+        getLocalBucket().setData(new RoutingTable());
     }
 
-    public RpcRegistry(ActorRef bucketStore) {
-        this.bucketStore = bucketStore;
-    }
-
-
     @Override
     protected void handleReceive(Object message) throws Exception {
         //TODO: if sender is remote, reject message
 
-        if (message instanceof SetLocalRouter)
+        if (message instanceof SetLocalRouter) {
             receiveSetLocalRouter((SetLocalRouter) message);
-
-        if (message instanceof AddOrUpdateRoutes)
+        } else if (message instanceof AddOrUpdateRoutes) {
             receiveAddRoutes((AddOrUpdateRoutes) message);
-
-        else if (message instanceof RemoveRoutes)
+        } else if (message instanceof RemoveRoutes) {
             receiveRemoveRoutes((RemoveRoutes) message);
-
-        else if (message instanceof Messages.FindRouters)
+        } else if (message instanceof Messages.FindRouters) {
             receiveGetRouter((FindRouters) message);
-
-        else
-            unhandled(message);
+        } else {
+            super.handleReceive(message);
+        }
     }
 
     /**
@@ -98,7 +61,7 @@ public class RpcRegistry extends AbstractUntypedActorWithMetering {
      * @param message contains {@link akka.actor.ActorRef} for rpc broker
      */
     private void receiveSetLocalRouter(SetLocalRouter message) {
-        localRouter = message.getRouter();
+        getLocalBucket().getData().setRouter(message.getRouter());
     }
 
     /**
@@ -106,10 +69,14 @@ public class RpcRegistry extends AbstractUntypedActorWithMetering {
      */
     private void receiveAddRoutes(AddOrUpdateRoutes msg) {
 
-        Preconditions.checkState(localRouter != null, "Router must be set first");
+        log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
+
+        RoutingTable table = getLocalBucket().getData().copy();
+        for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
+            table.addRoute(routeId);
+        }
 
-        Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration());
-        futureReply.map(getMapperToAddRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
+        updateLocalBucket(table);
     }
 
     /**
@@ -117,9 +84,12 @@ public class RpcRegistry extends AbstractUntypedActorWithMetering {
      */
     private void receiveRemoveRoutes(RemoveRoutes msg) {
 
-        Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration());
-        futureReply.map(getMapperToRemoveRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
+        RoutingTable table = getLocalBucket().getData().copy();
+        for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
+            table.removeRoute(routeId);
+        }
 
+        updateLocalBucket(table);
     }
 
     /**
@@ -128,168 +98,28 @@ public class RpcRegistry extends AbstractUntypedActorWithMetering {
      * @param msg
      */
     private void receiveGetRouter(FindRouters msg) {
-        final ActorRef sender = getSender();
-
-        Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), config.getAskDuration());
-        futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher());
-    }
-
-    /**
-     * Helper to create empty reply when no routers are found
-     *
-     * @return
-     */
-    private Messages.FindRoutersReply createEmptyReply() {
-        List<Pair<ActorRef, Long>> routerWithUpdateTime = Collections.emptyList();
-        return new Messages.FindRoutersReply(routerWithUpdateTime);
-    }
-
-    /**
-     * Helper to create a reply when routers are found for the given rpc
-     *
-     * @param buckets
-     * @param routeId
-     * @return
-     */
-    private Messages.FindRoutersReply createReplyWithRouters(
-            Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
-
         List<Pair<ActorRef, Long>> routers = new ArrayList<>();
-        Option<Pair<ActorRef, Long>> routerWithUpdateTime = null;
-
-        for (Bucket bucket : buckets.values()) {
-
-            RoutingTable table = (RoutingTable) bucket.getData();
-            if (table == null)
-                continue;
 
-            routerWithUpdateTime = table.getRouterFor(routeId);
-            if (routerWithUpdateTime.isEmpty())
-                continue;
+        RouteIdentifier<?, ?, ?> routeId = msg.getRouteIdentifier();
+        findRoutes(getLocalBucket().getData(), routeId, routers);
 
-            routers.add(routerWithUpdateTime.get());
+        for(Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
+            findRoutes(bucket.getData(), routeId, routers);
         }
 
-        return new Messages.FindRoutersReply(routers);
-    }
-
-
-    ///
-    ///private factories to create Mapper
-    ///
-
-    /**
-     * Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found
-     *
-     * @param routeId the rpc
-     * @param sender  client who asked to find the routers.
-     * @return
-     */
-    private Mapper<Object, Void> getMapperToGetRouter(
-            final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
-        return new Mapper<Object, Void>() {
-            @Override
-            public Void apply(Object replyMessage) {
-
-                if (replyMessage instanceof GetAllBucketsReply) {
-
-                    GetAllBucketsReply reply = (GetAllBucketsReply) replyMessage;
-                    Map<Address, Bucket> buckets = reply.getBuckets();
-
-                    if (buckets == null || buckets.isEmpty()) {
-                        sender.tell(createEmptyReply(), getSelf());
-                        return null;
-                    }
-
-                    sender.tell(createReplyWithRouters(buckets, routeId), getSelf());
-                }
-                return null;
-            }
-        };
-    }
-
-    /**
-     * Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently,
-     * it updates the local bucket in bucket store.
-     *
-     * @param routeIds rpc to remote
-     * @return
-     */
-    private Mapper<Object, Void> getMapperToRemoveRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
-        return new Mapper<Object, Void>() {
-            @Override
-            public Void apply(Object replyMessage) {
-                if (replyMessage instanceof GetLocalBucketReply) {
-
-                    GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
-                    Bucket<RoutingTable> bucket = reply.getBucket();
-
-                    if (bucket == null) {
-                        log.debug("Local bucket is null");
-                        return null;
-                    }
-
-                    RoutingTable table = bucket.getData();
-                    if (table == null)
-                        table = new RoutingTable();
-
-                    table.setRouter(localRouter);
-
-                    if (!table.isEmpty()) {
-                        for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
-                            table.removeRoute(routeId);
-                        }
-                    }
-                    bucket.setData(table);
-
-                    UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
-                    bucketStore.tell(updateBucketMessage, getSelf());
-                }
-                return null;
-            }
-        };
+        getSender().tell(new Messages.FindRoutersReply(routers), getSelf());
     }
 
-    /**
-     * Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently,
-     * it updates the local bucket in bucket store.
-     *
-     * @param routeIds rpc to add
-     * @return
-     */
-    private Mapper<Object, Void> getMapperToAddRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
-
-        return new Mapper<Object, Void>() {
-            @Override
-            public Void apply(Object replyMessage) {
-                if (replyMessage instanceof GetLocalBucketReply) {
-
-                    GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
-                    Bucket<RoutingTable> bucket = reply.getBucket();
-
-                    if (bucket == null) {
-                        log.debug("Local bucket is null");
-                        return null;
-                    }
-
-                    RoutingTable table = bucket.getData();
-                    if (table == null)
-                        table = new RoutingTable();
-
-                    table.setRouter(localRouter);
-                    for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
-                        table.addRoute(routeId);
-                    }
-
-                    bucket.setData(table);
-
-                    UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
-                    bucketStore.tell(updateBucketMessage, getSelf());
-                }
+    private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier<?, ?, ?> routeId,
+            List<Pair<ActorRef, Long>> routers) {
+        if (table == null) {
+            return;
+        }
 
-                return null;
-            }
-        };
+        Option<Pair<ActorRef, Long>> routerWithUpdateTime = table.getRouterFor(routeId);
+        if(!routerWithUpdateTime.isEmpty()) {
+            routers.add(routerWithUpdateTime.get());
+        }
     }
 
     /**