Bug 2526: Race condition may cause missing routes in RPC BucketStore
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / BucketStore.java
index 6ffe147e71384b66ece31c4e701294c8630456c8..934609b7cfcfeb8ea14a8c1ff803c53ba938b1aa 100644 (file)
@@ -15,11 +15,10 @@ import akka.actor.Props;
 import akka.cluster.ClusterActorRefProvider;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
+import com.google.common.annotations.VisibleForTesting;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
@@ -28,9 +27,6 @@ import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketSto
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
 import org.opendaylight.controller.utils.ConditionalProbe;
 
@@ -43,24 +39,26 @@ import org.opendaylight.controller.utils.ConditionalProbe;
  * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
  *
  */
-public class BucketStore extends AbstractUntypedActorWithMetering {
+public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMetering {
+
+    private static final Long NO_VERSION = -1L;
 
     final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
 
     /**
      * Bucket owned by the node
      */
-    private BucketImpl localBucket = new BucketImpl();
+    private final BucketImpl<T> localBucket = new BucketImpl<>();
 
     /**
      * Buckets ownded by other known nodes in the cluster
      */
-    private ConcurrentMap<Address, Bucket> remoteBuckets = new ConcurrentHashMap<>();
+    private final Map<Address, Bucket<T>> remoteBuckets = new HashMap<>();
 
     /**
      * Bucket version for every known node in the cluster including this node
      */
-    private ConcurrentMap<Address, Long> versions = new ConcurrentHashMap<>();
+    private final Map<Address, Long> versions = new HashMap<>();
 
     /**
      * Cluster address for this node
@@ -85,7 +83,6 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
         }
     }
 
-
     @Override
     protected void handleReceive(Object message) throws Exception {
         if (probe != null) {
@@ -98,20 +95,14 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
             probe = (ConditionalProbe) message;
             // Send back any message to tell the caller we got the probe.
             getSender().tell("Got it", getSelf());
-        } else if (message instanceof UpdateBucket) {
-            receiveUpdateBucket(((UpdateBucket) message).getBucket());
         } else if (message instanceof GetAllBuckets) {
-            receiveGetAllBucket();
-        } else if (message instanceof GetLocalBucket) {
-            receiveGetLocalBucket();
+            receiveGetAllBuckets();
         } else if (message instanceof GetBucketsByMembers) {
-            receiveGetBucketsByMembers(
-                    ((GetBucketsByMembers) message).getMembers());
+            receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
         } else if (message instanceof GetBucketVersions) {
             receiveGetBucketVersions();
         } else if (message instanceof UpdateRemoteBuckets) {
-            receiveUpdateRemoteBuckets(
-                    ((UpdateRemoteBuckets) message).getBuckets());
+            receiveUpdateRemoteBuckets(((UpdateRemoteBuckets) message).getBuckets());
         } else {
             if(log.isDebugEnabled()) {
                 log.debug("Unhandled message [{}]", message);
@@ -120,30 +111,10 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
         }
     }
 
-    /**
-     * Returns a copy of bucket owned by this node
-     */
-    private void receiveGetLocalBucket() {
-        final ActorRef sender = getSender();
-        GetLocalBucketReply reply = new GetLocalBucketReply(localBucket);
-        sender.tell(reply, getSelf());
-    }
-
-    /**
-     * Updates the bucket owned by this node
-     *
-     * @param updatedBucket
-     */
-    void receiveUpdateBucket(Bucket updatedBucket){
-
-        localBucket = (BucketImpl) updatedBucket;
-        versions.put(selfAddress, localBucket.getVersion());
-    }
-
     /**
      * Returns all the buckets the this node knows about, self owned + remote
      */
-    void receiveGetAllBucket(){
+    void receiveGetAllBuckets(){
         final ActorRef sender = getSender();
         sender.tell(new GetAllBucketsReply(getAllBuckets()), getSelf());
     }
@@ -153,11 +124,12 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
      *
      * @return self owned + remote buckets
      */
+    @SuppressWarnings("rawtypes")
     Map<Address, Bucket> getAllBuckets(){
         Map<Address, Bucket> all = new HashMap<>(remoteBuckets.size() + 1);
 
         //first add the local bucket
-        all.put(selfAddress, localBucket);
+        all.put(selfAddress, new BucketImpl<>(localBucket));
 
         //then get all remote buckets
         all.putAll(remoteBuckets);
@@ -170,6 +142,7 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
      *
      * @param members requested members
      */
+    @SuppressWarnings("rawtypes")
     void receiveGetBucketsByMembers(Set<Address> members){
         final ActorRef sender = getSender();
         Map<Address, Bucket> buckets = getBucketsByMembers(members);
@@ -182,12 +155,13 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
      * @param members requested members
      * @return buckets for requested memebers
      */
+    @SuppressWarnings("rawtypes")
     Map<Address, Bucket> getBucketsByMembers(Set<Address> members) {
         Map<Address, Bucket> buckets = new HashMap<>();
 
         //first add the local bucket if asked
         if (members.contains(selfAddress)) {
-            buckets.put(selfAddress, localBucket);
+            buckets.put(selfAddress, new BucketImpl<>(localBucket));
         }
 
         //then get buckets for requested remote nodes
@@ -215,8 +189,9 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
      * @param receivedBuckets buckets sent by remote
      *                        {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
      */
+    @SuppressWarnings({ "rawtypes", "unchecked" })
     void receiveUpdateRemoteBuckets(Map<Address, Bucket> receivedBuckets){
-
+        log.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
         if (receivedBuckets == null || receivedBuckets.isEmpty())
          {
             return; //nothing to do
@@ -229,10 +204,10 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
 
             Long localVersion = versions.get(entry.getKey());
             if (localVersion == null) {
-                localVersion = -1L;
+                localVersion = NO_VERSION;
             }
 
-            Bucket receivedBucket = entry.getValue();
+            Bucket<T> receivedBucket = entry.getValue();
 
             if (receivedBucket == null) {
                 continue;
@@ -240,7 +215,7 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
 
             Long remoteVersion = receivedBucket.getVersion();
             if (remoteVersion == null) {
-                remoteVersion = -1L;
+                remoteVersion = NO_VERSION;
             }
 
             //update only if remote version is newer
@@ -249,40 +224,27 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
                 versions.put(entry.getKey(), remoteVersion);
             }
         }
+
         if(log.isDebugEnabled()) {
             log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
         }
     }
 
-    ///
-    ///Getter Setters
-    ///
-
-    BucketImpl getLocalBucket() {
+    protected BucketImpl<T> getLocalBucket() {
         return localBucket;
     }
 
-    void setLocalBucket(BucketImpl localBucket) {
-        this.localBucket = localBucket;
+    protected void updateLocalBucket(T data) {
+        localBucket.setData(data);
+        versions.put(selfAddress, localBucket.getVersion());
     }
 
-    ConcurrentMap<Address, Bucket> getRemoteBuckets() {
+    protected Map<Address, Bucket<T>> getRemoteBuckets() {
         return remoteBuckets;
     }
 
-    void setRemoteBuckets(ConcurrentMap<Address, Bucket> remoteBuckets) {
-        this.remoteBuckets = remoteBuckets;
-    }
-
-    ConcurrentMap<Address, Long> getVersions() {
+    @VisibleForTesting
+    Map<Address, Long> getVersions() {
         return versions;
     }
-
-    void setVersions(ConcurrentMap<Address, Long> versions) {
-        this.versions = versions;
-    }
-
-    Address getSelfAddress() {
-        return selfAddress;
-    }
 }