Bug 2526: Race condition may cause missing routes in RPC BucketStore
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
index 51609870cc4aad1c8789dfdcd0b68f04563b5cdf..845c1c819a70ca6bac0fb1a717b31f7861b6a6b6 100644 (file)
@@ -8,85 +8,51 @@
 package org.opendaylight.controller.remote.rpc.registry;
 
 import akka.actor.ActorRef;
-import akka.actor.Address;
-import akka.actor.Props;
-import akka.actor.UntypedActor;
-import akka.dispatch.Mapper;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import akka.japi.Option;
 import akka.japi.Pair;
-import akka.pattern.Patterns;
 import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import scala.concurrent.Future;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoute;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoute;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
+import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
 
 /**
  * Registry to look up cluster nodes that have registered for a given rpc.
- * <p>
+ * <p/>
  * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
  * cluster wide information.
- *
  */
-public class RpcRegistry extends UntypedActor {
+public class RpcRegistry extends BucketStore<RoutingTable> {
 
     final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
 
-    /**
-     * Store to keep the registry. Bucket store sync's it across nodes in the cluster
-     */
-    private ActorRef bucketStore;
-
-    /**
-     * Rpc broker that would use the registry to route requests.
-     */
-    private ActorRef localRouter;
-
     public RpcRegistry() {
-        bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store");
-    }
-
-    public RpcRegistry(ActorRef bucketStore) {
-        this.bucketStore = bucketStore;
+        getLocalBucket().setData(new RoutingTable());
     }
 
     @Override
-    public void onReceive(Object message) throws Exception {
-
-        log.debug("Received message: message [{}]", message);
-
+    protected void handleReceive(Object message) throws Exception {
         //TODO: if sender is remote, reject message
 
-        if (message instanceof SetLocalRouter)
+        if (message instanceof SetLocalRouter) {
             receiveSetLocalRouter((SetLocalRouter) message);
-
-        if (message instanceof AddOrUpdateRoute)
-            receiveAddRoute((AddOrUpdateRoute) message);
-
-        else if (message instanceof RemoveRoute)
-            receiveRemoveRoute((RemoveRoute) message);
-
-        else if (message instanceof Messages.FindRouters)
-            receiveGetRouter((Messages.FindRouters) message);
-
-        else
-            unhandled(message);
+        } else if (message instanceof AddOrUpdateRoutes) {
+            receiveAddRoutes((AddOrUpdateRoutes) message);
+        } else if (message instanceof RemoveRoutes) {
+            receiveRemoveRoutes((RemoveRoutes) message);
+        } else if (message instanceof Messages.FindRouters) {
+            receiveGetRouter((FindRouters) message);
+        } else {
+            super.handleReceive(message);
+        }
     }
 
     /**
@@ -95,207 +61,65 @@ public class RpcRegistry extends UntypedActor {
      * @param message contains {@link akka.actor.ActorRef} for rpc broker
      */
     private void receiveSetLocalRouter(SetLocalRouter message) {
-        if (message == null || message.getRouter() == null)
-            return;//ignore
-
-        localRouter = message.getRouter();
+        getLocalBucket().getData().setRouter(message.getRouter());
     }
 
     /**
-     * //TODO: update this to accept multiple route registration
      * @param msg
      */
-    private void receiveAddRoute(AddOrUpdateRoute msg) {
-        if (msg.getRouteIdentifier() == null)
-            return;//ignore
+    private void receiveAddRoutes(AddOrUpdateRoutes msg) {
 
-        Preconditions.checkState(localRouter != null, "Router must be set first");
+        log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
 
-        Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000);
-        futureReply.map(getMapperToAddRoute(msg.getRouteIdentifier()), getContext().dispatcher());
-    }
-
-    /**
-     * //TODO: update this to accept multiple routes
-     * @param msg
-     */
-    private void receiveRemoveRoute(RemoveRoute msg) {
-        if (msg.getRouteIdentifier() == null)
-            return;//ignore
-
-        Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000);
-        futureReply.map(getMapperToRemoveRoute(msg.getRouteIdentifier()), getContext().dispatcher());
+        RoutingTable table = getLocalBucket().getData().copy();
+        for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
+            table.addRoute(routeId);
+        }
 
+        updateLocalBucket(table);
     }
 
     /**
-     * Finds routers for the given rpc.
-     * @param msg
+     * @param msg contains list of route ids to remove
      */
-    private void receiveGetRouter(Messages.FindRouters msg) {
-        final ActorRef sender = getSender();
+    private void receiveRemoveRoutes(RemoveRoutes msg) {
 
-        //if empty message, return empty list
-        if (msg.getRouteIdentifier() == null) {
-            sender.tell(createEmptyReply(), getSelf());
-            return;
+        RoutingTable table = getLocalBucket().getData().copy();
+        for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
+            table.removeRoute(routeId);
         }
 
-        Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), 1000);
-        futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher());
-
+        updateLocalBucket(table);
     }
 
     /**
-     * Helper to create empty reply when no routers are found
+     * Finds routers for the given rpc.
      *
-     * @return
-     */
-    private Messages.FindRoutersReply createEmptyReply() {
-        List<Pair<ActorRef, Long>> routerWithUpdateTime = Collections.emptyList();
-        return new Messages.FindRoutersReply(routerWithUpdateTime);
-    }
-
-    /**
-     * Helper to create a reply when routers are found for the given rpc
-     * @param buckets
-     * @param routeId
-     * @return
+     * @param msg
      */
-    private Messages.FindRoutersReply createReplyWithRouters(Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
-
+    private void receiveGetRouter(FindRouters msg) {
         List<Pair<ActorRef, Long>> routers = new ArrayList<>();
 
-        Option<Pair<ActorRef, Long>> routerWithUpdateTime = null;
-
-        for (Bucket bucket : buckets.values()) {
-
-            RoutingTable table = (RoutingTable) bucket.getData();
-
-            if (table == null)
-                continue;
-
-            routerWithUpdateTime = table.getRouterFor(routeId);
+        RouteIdentifier<?, ?, ?> routeId = msg.getRouteIdentifier();
+        findRoutes(getLocalBucket().getData(), routeId, routers);
 
-            if (routerWithUpdateTime.isEmpty())
-                continue;
-
-            routers.add(routerWithUpdateTime.get());
+        for(Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
+            findRoutes(bucket.getData(), routeId, routers);
         }
 
-        return new Messages.FindRoutersReply(routers);
+        getSender().tell(new Messages.FindRoutersReply(routers), getSelf());
     }
 
+    private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier<?, ?, ?> routeId,
+            List<Pair<ActorRef, Long>> routers) {
+        if (table == null) {
+            return;
+        }
 
-    ///
-    ///private factories to create Mapper
-    ///
-
-    /**
-     *  Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found
-     *
-     * @param routeId the rpc
-     * @param sender  client who asked to find the routers.
-     * @return
-     */
-    private Mapper<Object, Void> getMapperToGetRouter(final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
-        return new Mapper<Object, Void>() {
-            @Override
-            public Void apply(Object replyMessage) {
-
-                if (replyMessage instanceof GetAllBucketsReply) {
-
-                    GetAllBucketsReply reply = (GetAllBucketsReply) replyMessage;
-                    Map<Address, Bucket> buckets = reply.getBuckets();
-
-                    if (buckets == null || buckets.isEmpty()) {
-                        sender.tell(createEmptyReply(), getSelf());
-                        return null;
-                    }
-
-                    sender.tell(createReplyWithRouters(buckets, routeId), getSelf());
-                }
-                return null;
-            }
-        };
-    }
-
-    /**
-     * Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently,
-     * it updates the local bucket in bucket store.
-     *
-     * @param routeId rpc to remote
-     * @return
-     */
-    private Mapper<Object, Void> getMapperToRemoveRoute(final RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
-        return new Mapper<Object, Void>() {
-            @Override
-            public Void apply(Object replyMessage) {
-                if (replyMessage instanceof GetLocalBucketReply) {
-
-                    GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
-                    Bucket<RoutingTable> bucket = reply.getBucket();
-
-                    if (bucket == null) {
-                        log.debug("Local bucket is null");
-                        return null;
-                    }
-
-                    RoutingTable table = bucket.getData();
-                    if (table == null)
-                        table = new RoutingTable();
-
-                    table.setRouter(localRouter);
-                    table.removeRoute(routeId);
-
-                    bucket.setData(table);
-
-                    UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
-                    bucketStore.tell(updateBucketMessage, getSelf());
-                }
-                return null;
-            }
-        };
-    }
-
-    /**
-     * Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently,
-     * it updates the local bucket in bucket store.
-     *
-     * @param routeId rpc to add
-     * @return
-     */
-    private Mapper<Object, Void> getMapperToAddRoute(final RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
-
-        return new Mapper<Object, Void>() {
-            @Override
-            public Void apply(Object replyMessage) {
-                if (replyMessage instanceof GetLocalBucketReply) {
-
-                    GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
-                    Bucket<RoutingTable> bucket = reply.getBucket();
-
-                    if (bucket == null) {
-                        log.debug("Local bucket is null");
-                        return null;
-                    }
-
-                    RoutingTable table = bucket.getData();
-                    if (table == null)
-                        table = new RoutingTable();
-
-                    table.setRouter(localRouter);
-                    table.addRoute(routeId);
-
-                    bucket.setData(table);
-
-                    UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
-                    bucketStore.tell(updateBucketMessage, getSelf());
-                }
-
-                return null;
-            }
-        };
+        Option<Pair<ActorRef, Long>> routerWithUpdateTime = table.getRouterFor(routeId);
+        if(!routerWithUpdateTime.isEmpty()) {
+            routers.add(routerWithUpdateTime.get());
+        }
     }
 
     /**
@@ -305,47 +129,50 @@ public class RpcRegistry extends UntypedActor {
 
 
         public static class ContainsRoute {
-            final RpcRouter.RouteIdentifier<?,?,?> routeIdentifier;
+            final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
 
-            public ContainsRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
-                Preconditions.checkArgument(routeIdentifier != null);
-                this.routeIdentifier = routeIdentifier;
+            public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+                Preconditions.checkArgument(routeIdentifiers != null &&
+                                            !routeIdentifiers.isEmpty(),
+                                            "Route Identifiers must be supplied");
+                this.routeIdentifiers = routeIdentifiers;
             }
 
-            public RpcRouter.RouteIdentifier<?,?,?> getRouteIdentifier(){
-                return this.routeIdentifier;
+            public List<RpcRouter.RouteIdentifier<?, ?, ?>> getRouteIdentifiers() {
+                return this.routeIdentifiers;
             }
 
             @Override
             public String toString() {
-                return this.getClass().getSimpleName() + "{" +
-                        "routeIdentifier=" + routeIdentifier +
+                return "ContainsRoute{" +
+                        "routeIdentifiers=" + routeIdentifiers +
                         '}';
             }
         }
 
-        public static class AddOrUpdateRoute extends ContainsRoute{
+        public static class AddOrUpdateRoutes extends ContainsRoute {
 
-            public AddOrUpdateRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
-                super(routeIdentifier);
+            public AddOrUpdateRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+                super(routeIdentifiers);
             }
         }
 
-        public static class RemoveRoute extends ContainsRoute {
+        public static class RemoveRoutes extends ContainsRoute {
 
-            public RemoveRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
-                super(routeIdentifier);
+            public RemoveRoutes(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
+                super(routeIdentifiers);
             }
         }
 
-        public static class SetLocalRouter{
+        public static class SetLocalRouter {
             private final ActorRef router;
 
             public SetLocalRouter(ActorRef router) {
+                Preconditions.checkArgument(router != null, "Router must not be null");
                 this.router = router;
             }
 
-            public ActorRef getRouter(){
+            public ActorRef getRouter() {
                 return this.router;
             }
 
@@ -357,9 +184,23 @@ public class RpcRegistry extends UntypedActor {
             }
         }
 
-        public static class FindRouters extends ContainsRoute {
+        public static class FindRouters {
+            private final RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier;
+
             public FindRouters(RpcRouter.RouteIdentifier<?, ?, ?> routeIdentifier) {
-                super(routeIdentifier);
+                Preconditions.checkArgument(routeIdentifier != null, "Route must not be null");
+                this.routeIdentifier = routeIdentifier;
+            }
+
+            public RpcRouter.RouteIdentifier<?, ?, ?> getRouteIdentifier() {
+                return routeIdentifier;
+            }
+
+            @Override
+            public String toString() {
+                return "FindRouters{" +
+                        "routeIdentifier=" + routeIdentifier +
+                        '}';
             }
         }
 
@@ -367,10 +208,11 @@ public class RpcRegistry extends UntypedActor {
             final List<Pair<ActorRef, Long>> routerWithUpdateTime;
 
             public FindRoutersReply(List<Pair<ActorRef, Long>> routerWithUpdateTime) {
+                Preconditions.checkArgument(routerWithUpdateTime != null, "List of routers found must not be null");
                 this.routerWithUpdateTime = routerWithUpdateTime;
             }
 
-            public List<Pair<ActorRef, Long>> getRouterWithUpdateTime(){
+            public List<Pair<ActorRef, Long>> getRouterWithUpdateTime() {
                 return routerWithUpdateTime;
             }