Bug 2526: Race condition may cause missing routes in RPC BucketStore 53/13753/7
authortpantelis <tpanteli@brocade.com>
Fri, 19 Dec 2014 08:45:43 +0000 (03:45 -0500)
committertpantelis <tpanteli@brocade.com>
Sat, 20 Dec 2014 02:14:41 +0000 (21:14 -0500)
Changed the RpcRegistry class to derive from BucketStore instead of
creating the BucketStore as a separate actor. This was done because the
copy-on-write update operations in the BucketStore need to be done
atomically so the RpcRegistry needs direct access to the BucketStore in
order to do this safely and efficiently. The RpcRegistry handles the
sematics of the Bucket data - this keeps the BucketStore data agnostic.

Change-Id: Ief96c28775b729f459d324971403222e5a578029
Signed-off-by: tpantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Bucket.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java

index fe8c463..52b1106 100644 (file)
@@ -10,23 +10,22 @@ package org.opendaylight.controller.remote.rpc.registry;
 import akka.actor.ActorRef;
 import akka.japi.Option;
 import akka.japi.Pair;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Copier;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
-
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Copier;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
 
 public class RoutingTable implements Copier<RoutingTable>, Serializable {
     private static final long serialVersionUID = 1L;
 
-    private Map<RpcRouter.RouteIdentifier<?, ?, ?>, Long> table = new HashMap<>();
+    private final Map<RpcRouter.RouteIdentifier<?, ?, ?>, Long> table = new HashMap<>();
     private ActorRef router;
 
     @Override
     public RoutingTable copy() {
         RoutingTable copy = new RoutingTable();
-        copy.setTable(new HashMap<>(table));
+        copy.table.putAll(table);
         copy.setRouter(this.getRouter());
 
         return copy;
@@ -35,10 +34,11 @@ public class RoutingTable implements Copier<RoutingTable>, Serializable {
     public Option<Pair<ActorRef, Long>> getRouterFor(RpcRouter.RouteIdentifier<?, ?, ?> routeId){
         Long updatedTime = table.get(routeId);
 
-        if (updatedTime == null || router == null)
+        if (updatedTime == null || router == null) {
             return Option.none();
-        else
+        } else {
             return Option.option(new Pair<>(router, updatedTime));
+        }
     }
 
     public void addRoute(RpcRouter.RouteIdentifier<?,?,?> routeId){
@@ -49,23 +49,16 @@ public class RoutingTable implements Copier<RoutingTable>, Serializable {
         table.remove(routeId);
     }
 
-    public Boolean contains(RpcRouter.RouteIdentifier<?, ?, ?> routeId){
+    public boolean contains(RpcRouter.RouteIdentifier<?, ?, ?> routeId){
         return table.containsKey(routeId);
     }
 
-    public Boolean isEmpty(){
+    public boolean isEmpty(){
         return table.isEmpty();
     }
-    ///
-    /// Getter, Setters
-    ///
-    //TODO: Remove public
-    public Map<RpcRouter.RouteIdentifier<?, ?, ?>, Long> getTable() {
-        return table;
-    }
 
-    void setTable(Map<RpcRouter.RouteIdentifier<?, ?, ?>, Long> table) {
-        this.table = table;
+    public int size() {
+        return table.size();
     }
 
     public ActorRef getRouter() {
index 095d709..845c1c8 100644 (file)
@@ -8,36 +8,21 @@
 package org.opendaylight.controller.remote.rpc.registry;
 
 import akka.actor.ActorRef;
-import akka.actor.Address;
-import akka.actor.Props;
-import akka.dispatch.Mapper;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import akka.japi.Option;
 import akka.japi.Pair;
-import akka.pattern.Patterns;
 import com.google.common.base.Preconditions;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import java.util.ArrayList;
+import java.util.List;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import scala.concurrent.Future;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
+import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
 
 /**
  * Registry to look up cluster nodes that have registered for a given rpc.
@@ -45,51 +30,29 @@ import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.Bu
  * It uses {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} to maintain this
  * cluster wide information.
  */
-public class RpcRegistry extends AbstractUntypedActorWithMetering {
+public class RpcRegistry extends BucketStore<RoutingTable> {
 
     final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
 
-    /**
-     * Store to keep the registry. Bucket store sync's it across nodes in the cluster
-     */
-    private ActorRef bucketStore;
-
-    /**
-     * Rpc broker that would use the registry to route requests.
-     */
-    private ActorRef localRouter;
-
-    private RemoteRpcProviderConfig config;
-
     public RpcRegistry() {
-        bucketStore = getContext().actorOf(Props.create(BucketStore.class), "store");
-        this.config = new RemoteRpcProviderConfig(getContext().system().settings().config());
-        log.info("Bucket store path = {}", bucketStore.path().toString());
+        getLocalBucket().setData(new RoutingTable());
     }
 
-    public RpcRegistry(ActorRef bucketStore) {
-        this.bucketStore = bucketStore;
-    }
-
-
     @Override
     protected void handleReceive(Object message) throws Exception {
         //TODO: if sender is remote, reject message
 
-        if (message instanceof SetLocalRouter)
+        if (message instanceof SetLocalRouter) {
             receiveSetLocalRouter((SetLocalRouter) message);
-
-        if (message instanceof AddOrUpdateRoutes)
+        } else if (message instanceof AddOrUpdateRoutes) {
             receiveAddRoutes((AddOrUpdateRoutes) message);
-
-        else if (message instanceof RemoveRoutes)
+        } else if (message instanceof RemoveRoutes) {
             receiveRemoveRoutes((RemoveRoutes) message);
-
-        else if (message instanceof Messages.FindRouters)
+        } else if (message instanceof Messages.FindRouters) {
             receiveGetRouter((FindRouters) message);
-
-        else
-            unhandled(message);
+        } else {
+            super.handleReceive(message);
+        }
     }
 
     /**
@@ -98,7 +61,7 @@ public class RpcRegistry extends AbstractUntypedActorWithMetering {
      * @param message contains {@link akka.actor.ActorRef} for rpc broker
      */
     private void receiveSetLocalRouter(SetLocalRouter message) {
-        localRouter = message.getRouter();
+        getLocalBucket().getData().setRouter(message.getRouter());
     }
 
     /**
@@ -106,10 +69,14 @@ public class RpcRegistry extends AbstractUntypedActorWithMetering {
      */
     private void receiveAddRoutes(AddOrUpdateRoutes msg) {
 
-        Preconditions.checkState(localRouter != null, "Router must be set first");
+        log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
+
+        RoutingTable table = getLocalBucket().getData().copy();
+        for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
+            table.addRoute(routeId);
+        }
 
-        Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration());
-        futureReply.map(getMapperToAddRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
+        updateLocalBucket(table);
     }
 
     /**
@@ -117,9 +84,12 @@ public class RpcRegistry extends AbstractUntypedActorWithMetering {
      */
     private void receiveRemoveRoutes(RemoveRoutes msg) {
 
-        Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), config.getAskDuration());
-        futureReply.map(getMapperToRemoveRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
+        RoutingTable table = getLocalBucket().getData().copy();
+        for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
+            table.removeRoute(routeId);
+        }
 
+        updateLocalBucket(table);
     }
 
     /**
@@ -128,168 +98,28 @@ public class RpcRegistry extends AbstractUntypedActorWithMetering {
      * @param msg
      */
     private void receiveGetRouter(FindRouters msg) {
-        final ActorRef sender = getSender();
-
-        Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), config.getAskDuration());
-        futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher());
-    }
-
-    /**
-     * Helper to create empty reply when no routers are found
-     *
-     * @return
-     */
-    private Messages.FindRoutersReply createEmptyReply() {
-        List<Pair<ActorRef, Long>> routerWithUpdateTime = Collections.emptyList();
-        return new Messages.FindRoutersReply(routerWithUpdateTime);
-    }
-
-    /**
-     * Helper to create a reply when routers are found for the given rpc
-     *
-     * @param buckets
-     * @param routeId
-     * @return
-     */
-    private Messages.FindRoutersReply createReplyWithRouters(
-            Map<Address, Bucket> buckets, RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
-
         List<Pair<ActorRef, Long>> routers = new ArrayList<>();
-        Option<Pair<ActorRef, Long>> routerWithUpdateTime = null;
-
-        for (Bucket bucket : buckets.values()) {
-
-            RoutingTable table = (RoutingTable) bucket.getData();
-            if (table == null)
-                continue;
 
-            routerWithUpdateTime = table.getRouterFor(routeId);
-            if (routerWithUpdateTime.isEmpty())
-                continue;
+        RouteIdentifier<?, ?, ?> routeId = msg.getRouteIdentifier();
+        findRoutes(getLocalBucket().getData(), routeId, routers);
 
-            routers.add(routerWithUpdateTime.get());
+        for(Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
+            findRoutes(bucket.getData(), routeId, routers);
         }
 
-        return new Messages.FindRoutersReply(routers);
-    }
-
-
-    ///
-    ///private factories to create Mapper
-    ///
-
-    /**
-     * Receives all buckets returned from bucket store and finds routers for the buckets where given rpc(routeId) is found
-     *
-     * @param routeId the rpc
-     * @param sender  client who asked to find the routers.
-     * @return
-     */
-    private Mapper<Object, Void> getMapperToGetRouter(
-            final RpcRouter.RouteIdentifier<?, ?, ?> routeId, final ActorRef sender) {
-        return new Mapper<Object, Void>() {
-            @Override
-            public Void apply(Object replyMessage) {
-
-                if (replyMessage instanceof GetAllBucketsReply) {
-
-                    GetAllBucketsReply reply = (GetAllBucketsReply) replyMessage;
-                    Map<Address, Bucket> buckets = reply.getBuckets();
-
-                    if (buckets == null || buckets.isEmpty()) {
-                        sender.tell(createEmptyReply(), getSelf());
-                        return null;
-                    }
-
-                    sender.tell(createReplyWithRouters(buckets, routeId), getSelf());
-                }
-                return null;
-            }
-        };
-    }
-
-    /**
-     * Receives local bucket from bucket store and updates routing table in it by removing the route. Subsequently,
-     * it updates the local bucket in bucket store.
-     *
-     * @param routeIds rpc to remote
-     * @return
-     */
-    private Mapper<Object, Void> getMapperToRemoveRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
-        return new Mapper<Object, Void>() {
-            @Override
-            public Void apply(Object replyMessage) {
-                if (replyMessage instanceof GetLocalBucketReply) {
-
-                    GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
-                    Bucket<RoutingTable> bucket = reply.getBucket();
-
-                    if (bucket == null) {
-                        log.debug("Local bucket is null");
-                        return null;
-                    }
-
-                    RoutingTable table = bucket.getData();
-                    if (table == null)
-                        table = new RoutingTable();
-
-                    table.setRouter(localRouter);
-
-                    if (!table.isEmpty()) {
-                        for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
-                            table.removeRoute(routeId);
-                        }
-                    }
-                    bucket.setData(table);
-
-                    UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
-                    bucketStore.tell(updateBucketMessage, getSelf());
-                }
-                return null;
-            }
-        };
+        getSender().tell(new Messages.FindRoutersReply(routers), getSelf());
     }
 
-    /**
-     * Receives local bucket from bucket store and updates routing table in it by adding the route. Subsequently,
-     * it updates the local bucket in bucket store.
-     *
-     * @param routeIds rpc to add
-     * @return
-     */
-    private Mapper<Object, Void> getMapperToAddRoutes(final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds) {
-
-        return new Mapper<Object, Void>() {
-            @Override
-            public Void apply(Object replyMessage) {
-                if (replyMessage instanceof GetLocalBucketReply) {
-
-                    GetLocalBucketReply reply = (GetLocalBucketReply) replyMessage;
-                    Bucket<RoutingTable> bucket = reply.getBucket();
-
-                    if (bucket == null) {
-                        log.debug("Local bucket is null");
-                        return null;
-                    }
-
-                    RoutingTable table = bucket.getData();
-                    if (table == null)
-                        table = new RoutingTable();
-
-                    table.setRouter(localRouter);
-                    for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
-                        table.addRoute(routeId);
-                    }
-
-                    bucket.setData(table);
-
-                    UpdateBucket updateBucketMessage = new UpdateBucket(bucket);
-                    bucketStore.tell(updateBucketMessage, getSelf());
-                }
+    private void findRoutes(RoutingTable table, RpcRouter.RouteIdentifier<?, ?, ?> routeId,
+            List<Pair<ActorRef, Long>> routers) {
+        if (table == null) {
+            return;
+        }
 
-                return null;
-            }
-        };
+        Option<Pair<ActorRef, Long>> routerWithUpdateTime = table.getRouterFor(routeId);
+        if(!routerWithUpdateTime.isEmpty()) {
+            routers.add(routerWithUpdateTime.get());
+        }
     }
 
     /**
index 01c77f1..b81175e 100644 (file)
@@ -16,6 +16,23 @@ public class BucketImpl<T extends Copier<T>> implements Bucket<T>, Serializable
 
     private T data;
 
+    public BucketImpl() {
+    }
+
+    public BucketImpl(T data) {
+        this.data = data;
+    }
+
+    public BucketImpl(Bucket<T> other) {
+        this.version = other.getVersion();
+        this.data = other.getData();
+    }
+
+    public void setData(T data) {
+        this.data = data;
+        this.version = System.currentTimeMillis()+1;
+    }
+
     @Override
     public Long getVersion() {
         return version;
@@ -23,15 +40,7 @@ public class BucketImpl<T extends Copier<T>> implements Bucket<T>, Serializable
 
     @Override
     public T getData() {
-        if (this.data == null)
-            return null;
-
-        return data.copy();
-    }
-
-    public void setData(T data){
-        this.version = System.currentTimeMillis()+1;
-        this.data = data;
+        return data;
     }
 
     @Override
index 6ffe147..934609b 100644 (file)
@@ -15,11 +15,10 @@ import akka.actor.Props;
 import akka.cluster.ClusterActorRefProvider;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
+import com.google.common.annotations.VisibleForTesting;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
@@ -28,9 +27,6 @@ import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketSto
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
 import org.opendaylight.controller.utils.ConditionalProbe;
 
@@ -43,24 +39,26 @@ import org.opendaylight.controller.utils.ConditionalProbe;
  * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
  *
  */
-public class BucketStore extends AbstractUntypedActorWithMetering {
+public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMetering {
+
+    private static final Long NO_VERSION = -1L;
 
     final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
 
     /**
      * Bucket owned by the node
      */
-    private BucketImpl localBucket = new BucketImpl();
+    private final BucketImpl<T> localBucket = new BucketImpl<>();
 
     /**
      * Buckets ownded by other known nodes in the cluster
      */
-    private ConcurrentMap<Address, Bucket> remoteBuckets = new ConcurrentHashMap<>();
+    private final Map<Address, Bucket<T>> remoteBuckets = new HashMap<>();
 
     /**
      * Bucket version for every known node in the cluster including this node
      */
-    private ConcurrentMap<Address, Long> versions = new ConcurrentHashMap<>();
+    private final Map<Address, Long> versions = new HashMap<>();
 
     /**
      * Cluster address for this node
@@ -85,7 +83,6 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
         }
     }
 
-
     @Override
     protected void handleReceive(Object message) throws Exception {
         if (probe != null) {
@@ -98,20 +95,14 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
             probe = (ConditionalProbe) message;
             // Send back any message to tell the caller we got the probe.
             getSender().tell("Got it", getSelf());
-        } else if (message instanceof UpdateBucket) {
-            receiveUpdateBucket(((UpdateBucket) message).getBucket());
         } else if (message instanceof GetAllBuckets) {
-            receiveGetAllBucket();
-        } else if (message instanceof GetLocalBucket) {
-            receiveGetLocalBucket();
+            receiveGetAllBuckets();
         } else if (message instanceof GetBucketsByMembers) {
-            receiveGetBucketsByMembers(
-                    ((GetBucketsByMembers) message).getMembers());
+            receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
         } else if (message instanceof GetBucketVersions) {
             receiveGetBucketVersions();
         } else if (message instanceof UpdateRemoteBuckets) {
-            receiveUpdateRemoteBuckets(
-                    ((UpdateRemoteBuckets) message).getBuckets());
+            receiveUpdateRemoteBuckets(((UpdateRemoteBuckets) message).getBuckets());
         } else {
             if(log.isDebugEnabled()) {
                 log.debug("Unhandled message [{}]", message);
@@ -120,30 +111,10 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
         }
     }
 
-    /**
-     * Returns a copy of bucket owned by this node
-     */
-    private void receiveGetLocalBucket() {
-        final ActorRef sender = getSender();
-        GetLocalBucketReply reply = new GetLocalBucketReply(localBucket);
-        sender.tell(reply, getSelf());
-    }
-
-    /**
-     * Updates the bucket owned by this node
-     *
-     * @param updatedBucket
-     */
-    void receiveUpdateBucket(Bucket updatedBucket){
-
-        localBucket = (BucketImpl) updatedBucket;
-        versions.put(selfAddress, localBucket.getVersion());
-    }
-
     /**
      * Returns all the buckets the this node knows about, self owned + remote
      */
-    void receiveGetAllBucket(){
+    void receiveGetAllBuckets(){
         final ActorRef sender = getSender();
         sender.tell(new GetAllBucketsReply(getAllBuckets()), getSelf());
     }
@@ -153,11 +124,12 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
      *
      * @return self owned + remote buckets
      */
+    @SuppressWarnings("rawtypes")
     Map<Address, Bucket> getAllBuckets(){
         Map<Address, Bucket> all = new HashMap<>(remoteBuckets.size() + 1);
 
         //first add the local bucket
-        all.put(selfAddress, localBucket);
+        all.put(selfAddress, new BucketImpl<>(localBucket));
 
         //then get all remote buckets
         all.putAll(remoteBuckets);
@@ -170,6 +142,7 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
      *
      * @param members requested members
      */
+    @SuppressWarnings("rawtypes")
     void receiveGetBucketsByMembers(Set<Address> members){
         final ActorRef sender = getSender();
         Map<Address, Bucket> buckets = getBucketsByMembers(members);
@@ -182,12 +155,13 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
      * @param members requested members
      * @return buckets for requested memebers
      */
+    @SuppressWarnings("rawtypes")
     Map<Address, Bucket> getBucketsByMembers(Set<Address> members) {
         Map<Address, Bucket> buckets = new HashMap<>();
 
         //first add the local bucket if asked
         if (members.contains(selfAddress)) {
-            buckets.put(selfAddress, localBucket);
+            buckets.put(selfAddress, new BucketImpl<>(localBucket));
         }
 
         //then get buckets for requested remote nodes
@@ -215,8 +189,9 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
      * @param receivedBuckets buckets sent by remote
      *                        {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
      */
+    @SuppressWarnings({ "rawtypes", "unchecked" })
     void receiveUpdateRemoteBuckets(Map<Address, Bucket> receivedBuckets){
-
+        log.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
         if (receivedBuckets == null || receivedBuckets.isEmpty())
          {
             return; //nothing to do
@@ -229,10 +204,10 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
 
             Long localVersion = versions.get(entry.getKey());
             if (localVersion == null) {
-                localVersion = -1L;
+                localVersion = NO_VERSION;
             }
 
-            Bucket receivedBucket = entry.getValue();
+            Bucket<T> receivedBucket = entry.getValue();
 
             if (receivedBucket == null) {
                 continue;
@@ -240,7 +215,7 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
 
             Long remoteVersion = receivedBucket.getVersion();
             if (remoteVersion == null) {
-                remoteVersion = -1L;
+                remoteVersion = NO_VERSION;
             }
 
             //update only if remote version is newer
@@ -249,40 +224,27 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
                 versions.put(entry.getKey(), remoteVersion);
             }
         }
+
         if(log.isDebugEnabled()) {
             log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
         }
     }
 
-    ///
-    ///Getter Setters
-    ///
-
-    BucketImpl getLocalBucket() {
+    protected BucketImpl<T> getLocalBucket() {
         return localBucket;
     }
 
-    void setLocalBucket(BucketImpl localBucket) {
-        this.localBucket = localBucket;
+    protected void updateLocalBucket(T data) {
+        localBucket.setData(data);
+        versions.put(selfAddress, localBucket.getVersion());
     }
 
-    ConcurrentMap<Address, Bucket> getRemoteBuckets() {
+    protected Map<Address, Bucket<T>> getRemoteBuckets() {
         return remoteBuckets;
     }
 
-    void setRemoteBuckets(ConcurrentMap<Address, Bucket> remoteBuckets) {
-        this.remoteBuckets = remoteBuckets;
-    }
-
-    ConcurrentMap<Address, Long> getVersions() {
+    @VisibleForTesting
+    Map<Address, Long> getVersions() {
         return versions;
     }
-
-    void setVersions(ConcurrentMap<Address, Long> versions) {
-        this.versions = versions;
-    }
-
-    Address getSelfAddress() {
-        return selfAddress;
-    }
 }
index 4e8f2c6..b05bd7d 100644 (file)
@@ -9,16 +9,14 @@ package org.opendaylight.controller.remote.rpc.registry.gossip;
 
 import akka.actor.Address;
 import com.google.common.base.Preconditions;
-
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBucketVersions;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBuckets;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBucketVersions;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.ContainsBuckets;
 
 
 /**
@@ -29,46 +27,13 @@ public class Messages {
 
     public static class BucketStoreMessages{
 
-        public static class GetLocalBucket implements Serializable {
-            private static final long serialVersionUID = 1L;
-        }
-
-        public static class ContainsBucket implements Serializable {
-            private static final long serialVersionUID = 1L;
-            final private Bucket bucket;
-
-            public ContainsBucket(Bucket bucket){
-                Preconditions.checkArgument(bucket != null, "bucket can not be null");
-                this.bucket = bucket;
-            }
-
-            public Bucket getBucket(){
-                return bucket;
-            }
-
-        }
-
-        public static class UpdateBucket extends ContainsBucket implements Serializable {
-            private static final long serialVersionUID = 1L;
-            public UpdateBucket(Bucket bucket){
-                super(bucket);
-            }
-        }
-
-        public static class GetLocalBucketReply extends ContainsBucket implements Serializable {
-            private static final long serialVersionUID = 1L;
-            public GetLocalBucketReply(Bucket bucket){
-                super(bucket);
-            }
-        }
-
         public static class GetAllBuckets implements Serializable {
             private static final long serialVersionUID = 1L;
         }
 
         public static class GetBucketsByMembers implements Serializable{
             private static final long serialVersionUID = 1L;
-            private Set<Address> members;
+            private final Set<Address> members;
 
             public GetBucketsByMembers(Set<Address> members){
                 Preconditions.checkArgument(members != null, "members can not be null");
@@ -82,7 +47,7 @@ public class Messages {
 
         public static class ContainsBuckets implements Serializable{
             private static final long serialVersionUID = 1L;
-            private Map<Address, Bucket> buckets;
+            private final Map<Address, Bucket> buckets;
 
             public ContainsBuckets(Map<Address, Bucket> buckets){
                 Preconditions.checkArgument(buckets != null, "buckets can not be null");
@@ -94,11 +59,12 @@ public class Messages {
 
                 for (Map.Entry<Address, Bucket> entry : buckets.entrySet()){
                     //ignore null entries
-                    if ( (entry.getKey() == null) || (entry.getValue() == null) )
+                    if ( (entry.getKey() == null) || (entry.getValue() == null) ) {
                         continue;
+                    }
                     copy.put(entry.getKey(), entry.getValue());
                 }
-                return new HashMap<>(copy);
+                return copy;
             }
         }
 
@@ -162,7 +128,7 @@ public class Messages {
 
         public static final class GossipStatus extends ContainsBucketVersions implements Serializable{
             private static final long serialVersionUID = 1L;
-            private Address from;
+            private final Address from;
 
             public GossipStatus(Address from, Map<Address, Long> versions) {
                 super(versions);
index e0d145d..5b7b7e4 100644 (file)
@@ -1,38 +1,41 @@
 package org.opendaylight.controller.remote.rpc.registry;
 
-import akka.actor.ActorPath;
 import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
-import akka.actor.ChildActorPath;
+import akka.actor.Address;
 import akka.actor.Props;
-import akka.pattern.Patterns;
+import akka.japi.Pair;
 import akka.testkit.JavaTestKit;
-import akka.util.Timeout;
-import com.google.common.base.Predicate;
+import com.google.common.util.concurrent.Uninterruptibles;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import javax.annotation.Nullable;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
 import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRoutersReply;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.opendaylight.controller.utils.ConditionalProbe;
+import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
 import org.opendaylight.yangtools.yang.common.QName;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -46,6 +49,8 @@ public class RpcRegistryTest {
     private ActorRef registry2;
     private ActorRef registry3;
 
+    private int routeIdCounter = 1;
+
     @BeforeClass
     public static void staticSetup() throws InterruptedException {
       RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build();
@@ -97,27 +102,30 @@ public class RpcRegistryTest {
 
         final JavaTestKit mockBroker = new JavaTestKit(node1);
 
-        final ActorPath bucketStorePath = new ChildActorPath(registry1.path(), "store");
+        Address nodeAddress = node1.provider().getDefaultAddress();
 
         // Add rpc on node 1
         registry1.tell(new SetLocalRouter(mockBroker.getRef()), mockBroker.getRef());
 
-        // install probe
-        final JavaTestKit probe1 = createProbeForMessage(node1, bucketStorePath,
-                Messages.BucketStoreMessages.UpdateBucket.class);
+        List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds = createRouteIds();
 
-        registry1.tell(getAddRouteMessage(), mockBroker.getRef());
+        registry1.tell(new AddOrUpdateRoutes(addedRouteIds), mockBroker.getRef());
 
         // Bucket store should get an update bucket message. Updated bucket contains added rpc.
-        probe1.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
-                Messages.BucketStoreMessages.UpdateBucket.class);
+
+        Map<Address, Bucket> buckets = retrieveBuckets(registry1, mockBroker, nodeAddress);
+        verifyBucket(buckets.get(nodeAddress), addedRouteIds);
+
+        Map<Address, Long> versions = retrieveVersions(registry1, mockBroker);
+        Assert.assertEquals("Version for bucket " + nodeAddress, buckets.get(nodeAddress).getVersion(),
+                versions.get(nodeAddress));
 
         // Now remove rpc
-        registry1.tell(getRemoveRouteMessage(), mockBroker.getRef());
+        registry1.tell(new RemoveRoutes(addedRouteIds), mockBroker.getRef());
 
         // Bucket store should get an update bucket message. Rpc is removed in the updated bucket
-        probe1.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
-                Messages.BucketStoreMessages.UpdateBucket.class);
+
+        verifyEmptyBucket(mockBroker, registry1, nodeAddress);
 
         System.out.println("testAddRemoveRpcOnSameNode ending");
 
@@ -136,30 +144,52 @@ public class RpcRegistryTest {
         System.out.println("testRpcAddRemoveInCluster starting");
 
         final JavaTestKit mockBroker1 = new JavaTestKit(node1);
+        final JavaTestKit mockBroker2 = new JavaTestKit(node2);
+
+        List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds = createRouteIds();
 
-        // install probe on node2's bucket store
-        final ActorPath bucketStorePath = new ChildActorPath(registry2.path(), "store");
-        final JavaTestKit probe2 = createProbeForMessage(node2, bucketStorePath,
-                Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+        Address node1Address = node1.provider().getDefaultAddress();
 
         // Add rpc on node 1
         registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
-        registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
+        registry1.tell(new AddOrUpdateRoutes(addedRouteIds), mockBroker1.getRef());
 
         // Bucket store on node2 should get a message to update its local copy of remote buckets
-        probe2.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
-                Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+
+        Map<Address, Bucket> buckets = retrieveBuckets(registry2, mockBroker2, node1Address);
+        verifyBucket(buckets.get(node1Address), addedRouteIds);
 
         // Now remove
-        registry1.tell(getRemoveRouteMessage(), mockBroker1.getRef());
+        registry1.tell(new RemoveRoutes(addedRouteIds), mockBroker1.getRef());
 
-        // Bucket store on node2 should get a message to update its local copy of remote buckets
-        probe2.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
-                Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+        // Bucket store on node2 should get a message to update its local copy of remote buckets.
+        // Wait for the bucket for node1 to be empty.
+
+        verifyEmptyBucket(mockBroker2, registry2, node1Address);
 
         System.out.println("testRpcAddRemoveInCluster ending");
     }
 
+    private void verifyEmptyBucket(JavaTestKit testKit, ActorRef registry, Address address)
+            throws AssertionError {
+        Map<Address, Bucket> buckets;
+        int nTries = 0;
+        while(true) {
+            buckets = retrieveBuckets(registry1, testKit, address);
+
+            try {
+                verifyBucket(buckets.get(address), Collections.<RouteIdentifier<?, ?, ?>>emptyList());
+                break;
+            } catch (AssertionError e) {
+                if(++nTries >= 50) {
+                    throw e;
+                }
+            }
+
+            Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+        }
+    }
+
     /**
      * Three node cluster. Register rpc on 2 nodes. Ensure 3rd gets updated.
      *
@@ -174,76 +204,142 @@ public class RpcRegistryTest {
 
         registry3.tell(new SetLocalRouter(mockBroker3.getRef()), mockBroker3.getRef());
 
-        // install probe on node 3
-        final ActorPath bucketStorePath = new ChildActorPath(registry3.path(), "store");
-        final JavaTestKit probe3 = createProbeForMessage(node3, bucketStorePath,
-                Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
-
         // Add rpc on node 1
+        List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds1 = createRouteIds();
         registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
-        registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
-
-        probe3.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
-                Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+        registry1.tell(new AddOrUpdateRoutes(addedRouteIds1), mockBroker1.getRef());
 
-        // Add same rpc on node 2
+        // Add rpc on node 2
+        List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds2 = createRouteIds();
         registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef());
-        registry2.tell(getAddRouteMessage(), mockBroker2.getRef());
+        registry2.tell(new AddOrUpdateRoutes(addedRouteIds2), mockBroker2.getRef());
+
+        Address node1Address = node1.provider().getDefaultAddress();
+        Address node2Address = node2.provider().getDefaultAddress();
+
+        Map<Address, Bucket> buckets = retrieveBuckets(registry3, mockBroker3, node1Address,
+                node2Address);
 
-        probe3.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
-                Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+        verifyBucket(buckets.get(node1Address), addedRouteIds1);
+        verifyBucket(buckets.get(node2Address), addedRouteIds2);
+
+        Map<Address, Long> versions = retrieveVersions(registry3, mockBroker3);
+        Assert.assertEquals("Version for bucket " + node1Address, buckets.get(node1Address).getVersion(),
+                versions.get(node1Address));
+        Assert.assertEquals("Version for bucket " + node2Address, buckets.get(node2Address).getVersion(),
+                versions.get(node2Address));
+
+        RouteIdentifier<?, ?, ?> routeID = addedRouteIds1.get(0);
+        registry3.tell(new FindRouters(routeID), mockBroker3.getRef());
+
+        FindRoutersReply reply = mockBroker3.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
+                FindRoutersReply.class);
+
+        List<Pair<ActorRef, Long>> respList = reply.getRouterWithUpdateTime();
+        Assert.assertEquals("getRouterWithUpdateTime size", 1, respList.size());
+
+        respList.get(0).first().tell("hello", ActorRef.noSender());
+        mockBroker1.expectMsgEquals(Duration.create(3, TimeUnit.SECONDS), "hello");
     }
 
-    private JavaTestKit createProbeForMessage(ActorSystem node, ActorPath subjectPath, final Class<?> clazz)
-            throws Exception {
-        final JavaTestKit probe = new JavaTestKit(node);
-
-        ConditionalProbe conditionalProbe = new ConditionalProbe(probe.getRef(), new Predicate<Object>() {
-            @Override
-            public boolean apply(@Nullable Object input) {
-                if (input != null) {
-                    return clazz.equals(input.getClass());
-                } else {
-                    return false;
-                }
+    private Map<Address, Long> retrieveVersions(ActorRef bucketStore, JavaTestKit testKit) {
+        bucketStore.tell(new GetBucketVersions(), testKit.getRef());
+        GetBucketVersionsReply reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
+                GetBucketVersionsReply.class);
+        return reply.getVersions();
+    }
+
+    private void verifyBucket(Bucket<RoutingTable> bucket, List<RouteIdentifier<?, ?, ?>> expRouteIds) {
+        RoutingTable table = bucket.getData();
+        Assert.assertNotNull("Bucket RoutingTable is null", table);
+        for(RouteIdentifier<?, ?, ?> r: expRouteIds) {
+            if(!table.contains(r)) {
+                Assert.fail("RoutingTable does not contain " + r + ". Actual: " + table);
             }
-        });
+        }
 
-        FiniteDuration duration = Duration.create(3, TimeUnit.SECONDS);
-        Timeout timeout = new Timeout(duration);
-        int maxTries = 30;
-        int i = 0;
-        while(true) {
-            ActorSelection subject = node.actorSelection(subjectPath);
-            Future<Object> future = Patterns.ask(subject, conditionalProbe, timeout);
+        Assert.assertEquals("RoutingTable size", expRouteIds.size(), table.size());
+    }
 
-            try {
-                Await.ready(future, duration);
-                break;
-            } catch (TimeoutException | InterruptedException e) {
-                if(++i > maxTries) {
-                    throw e;
+    private Map<Address, Bucket> retrieveBuckets(ActorRef bucketStore, JavaTestKit testKit,
+            Address... addresses) {
+        int nTries = 0;
+        while(true) {
+            bucketStore.tell(new GetAllBuckets(), testKit.getRef());
+            GetAllBucketsReply reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
+                    GetAllBucketsReply.class);
+
+            Map<Address, Bucket> buckets = reply.getBuckets();
+            boolean foundAll = true;
+            for(Address addr: addresses) {
+                Bucket bucket = buckets.get(addr);
+                if(bucket  == null) {
+                    foundAll = false;
+                    break;
                 }
             }
-        }
 
-        return probe;
+            if(foundAll) {
+                return buckets;
+            }
 
-    }
+            if(++nTries >= 50) {
+                Assert.fail("Missing expected buckets for addresses: " + Arrays.toString(addresses)
+                        + ", Actual: " + buckets);
+            }
 
-    private AddOrUpdateRoutes getAddRouteMessage() throws URISyntaxException {
-        return new AddOrUpdateRoutes(createRouteIds());
+            Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+        }
     }
 
-    private RemoveRoutes getRemoveRouteMessage() throws URISyntaxException {
-        return new RemoveRoutes(createRouteIds());
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testAddRoutesConcurrency() throws Exception {
+        final JavaTestKit testKit = new JavaTestKit(node1);
+
+        registry1.tell(new SetLocalRouter(testKit.getRef()), ActorRef.noSender());
+
+        final int nRoutes = 500;
+        final RouteIdentifier<?, ?, ?>[] added = new RouteIdentifier<?, ?, ?>[nRoutes];
+        for(int i = 0; i < nRoutes; i++) {
+            final RouteIdentifierImpl routeId = new RouteIdentifierImpl(null,
+                    new QName(new URI("/mockrpc"), "type" + i), null);
+            added[i] = routeId;
+
+            //Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+            registry1.tell(new AddOrUpdateRoutes(Arrays.<RouteIdentifier<?, ?, ?>>asList(routeId)),
+                    ActorRef.noSender());
+        }
+
+        GetAllBuckets getAllBuckets = new GetAllBuckets();
+        FiniteDuration duration = Duration.create(3, TimeUnit.SECONDS);
+        int nTries = 0;
+        while(true) {
+            registry1.tell(getAllBuckets, testKit.getRef());
+            GetAllBucketsReply reply = testKit.expectMsgClass(duration, GetAllBucketsReply.class);
+
+            Bucket<RoutingTable> localBucket = reply.getBuckets().values().iterator().next();
+            RoutingTable table = localBucket.getData();
+            if(table != null && table.size() == nRoutes) {
+                for(RouteIdentifier<?, ?, ?> r: added) {
+                    Assert.assertEquals("RoutingTable contains " + r, true, table.contains(r));
+                }
+
+                break;
+            }
+
+            if(++nTries >= 50) {
+                Assert.fail("Expected # routes: " + nRoutes + ", Actual: " + table.size());
+            }
+
+            Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+        }
     }
 
     private List<RpcRouter.RouteIdentifier<?, ?, ?>> createRouteIds() throws URISyntaxException {
-        QName type = new QName(new URI("/mockrpc"), "mockrpc");
+        QName type = new QName(new URI("/mockrpc"), "mockrpc" + routeIdCounter++);
         List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new ArrayList<>();
         routeIds.add(new RouteIdentifierImpl(null, type, null));
         return routeIds;
     }
-
 }
index 78fcbd3..ddd08a5 100644 (file)
@@ -12,27 +12,23 @@ import akka.actor.Address;
 import akka.actor.Props;
 import akka.testkit.TestActorRef;
 import com.typesafe.config.ConfigFactory;
+import java.util.HashMap;
+import java.util.Map;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.remote.rpc.TerminationMonitor;
 
-import java.util.HashMap;
-import java.util.Map;
-
 public class BucketStoreTest {
 
     private static ActorSystem system;
-    private static BucketStore store;
 
     @BeforeClass
     public static void setup() {
 
         system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("unit-test"));
         system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
-
-        store = createStore();
     }
 
     @AfterClass
@@ -40,21 +36,6 @@ public class BucketStoreTest {
         system.shutdown();
     }
 
-    /**
-     * Given a new local bucket
-     * Should replace
-     */
-    @Test
-    public void testReceiveUpdateBucket(){
-        Bucket bucket = new BucketImpl();
-        Long expectedVersion = bucket.getVersion();
-
-        store.receiveUpdateBucket(bucket);
-
-        Assert.assertEquals(bucket, store.getLocalBucket());
-        Assert.assertEquals(expectedVersion, store.getLocalBucket().getVersion());
-    }
-
     /**
      * Given remote buckets
      * Should merge with local copy of remote buckets
@@ -62,6 +43,8 @@ public class BucketStoreTest {
     @Test
     public void testReceiveUpdateRemoteBuckets(){
 
+        BucketStore store = createStore();
+
         Address localAddress = system.provider().getDefaultAddress();
         Bucket localBucket = new BucketImpl();
 
@@ -84,7 +67,7 @@ public class BucketStoreTest {
 
         //Should NOT contain local bucket
         //Should contain ONLY 3 entries i.e a1, a2, a3
-        Map<Address, Bucket> remoteBucketsInStore = store.getRemoteBuckets();
+        Map<Address, Bucket<?>> remoteBucketsInStore = store.getRemoteBuckets();
         Assert.assertFalse("remote buckets contains local bucket", remoteBucketsInStore.containsKey(localAddress));
         Assert.assertTrue(remoteBucketsInStore.size() == 3);
 
@@ -122,11 +105,9 @@ public class BucketStoreTest {
         Assert.assertTrue(remoteBucketsInStore.size() == 4);
 
         //Should update versions map
-        //versions map contains versions for all remote buckets (4) + local bucket
-        //so it should have total 5.
+        //versions map contains versions for all remote buckets (4).
         Map<Address, Long> versionsInStore = store.getVersions();
-        Assert.assertTrue(String.format("Expected:%s, Actual:%s", 5, versionsInStore.size()),
-                          versionsInStore.size() == 5);
+        Assert.assertEquals(4, versionsInStore.size());
         Assert.assertEquals(b1.getVersion(), versionsInStore.get(a1));
         Assert.assertEquals(b2.getVersion(), versionsInStore.get(a2));
         Assert.assertEquals(b3_new.getVersion(), versionsInStore.get(a3));