BUG-3128: rework sal-remoterpc-connector
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / BucketStore.java
index 81e6a9ccc3c253e0e409e37e75f8d7ad1a4af888..7ae091d19983e0b692e9f654605fc184d8d4b6a6 100644 (file)
@@ -11,11 +11,11 @@ package org.opendaylight.controller.remote.rpc.registry.gossip;
 import akka.actor.ActorRef;
 import akka.actor.ActorRefProvider;
 import akka.actor.Address;
-import akka.actor.Props;
 import akka.cluster.ClusterActorRefProvider;
 import com.google.common.base.Preconditions;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
@@ -25,10 +25,9 @@ import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketSto
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
 import org.opendaylight.controller.utils.ConditionalProbe;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * A store that syncs its data across nodes in the cluster.
@@ -41,18 +40,13 @@ import org.slf4j.LoggerFactory;
  *
  */
 public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMetering {
-
-    private static final Long NO_VERSION = -1L;
-
-    protected final Logger log = LoggerFactory.getLogger(getClass());
-
     /**
      * Bucket owned by the node.
      */
-    private final BucketImpl<T> localBucket = new BucketImpl<>();
+    private final BucketImpl<T> localBucket;
 
     /**
-     * Buckets ownded by other known nodes in the cluster.
+     * Buckets owned by other known nodes in the cluster.
      */
     private final Map<Address, Bucket<T>> remoteBuckets = new HashMap<>();
 
@@ -66,12 +60,14 @@ public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMe
      */
     private Address selfAddress;
 
+    // FIXME: should be part of test-specific subclass
     private ConditionalProbe probe;
 
     private final RemoteRpcProviderConfig config;
 
-    public BucketStore(RemoteRpcProviderConfig config) {
+    public BucketStore(final RemoteRpcProviderConfig config, final T initialData) {
         this.config = Preconditions.checkNotNull(config);
+        this.localBucket = new BucketImpl<>(initialData);
     }
 
     @Override
@@ -79,34 +75,37 @@ public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMe
         ActorRefProvider provider = getContext().provider();
         selfAddress = provider.getDefaultAddress();
 
-        if ( provider instanceof ClusterActorRefProvider) {
-            getContext().actorOf(Props.create(Gossiper.class, config).withMailbox(config.getMailBoxName()), "gossiper");
+        if (provider instanceof ClusterActorRefProvider) {
+            getContext().actorOf(Gossiper.props(config).withMailbox(config.getMailBoxName()), "gossiper");
         }
     }
 
     @SuppressWarnings("unchecked")
     @Override
-    protected void handleReceive(Object message) throws Exception {
+    protected void handleReceive(final Object message) throws Exception {
         if (probe != null) {
             probe.tell(message, getSelf());
         }
 
-        if (message instanceof ConditionalProbe) {
-            // The ConditionalProbe is only used for unit tests.
-            log.info("Received probe {} {}", getSelf(), message);
-            probe = (ConditionalProbe) message;
-            // Send back any message to tell the caller we got the probe.
-            getSender().tell("Got it", getSelf());
-        } else if (message instanceof GetAllBuckets) {
-            receiveGetAllBuckets();
-        } else if (message instanceof GetBucketsByMembers) {
+        if (message instanceof GetBucketsByMembers) {
             receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
         } else if (message instanceof GetBucketVersions) {
             receiveGetBucketVersions();
         } else if (message instanceof UpdateRemoteBuckets) {
             receiveUpdateRemoteBuckets(((UpdateRemoteBuckets<T>) message).getBuckets());
+        } else if (message instanceof RemoveRemoteBucket) {
+            removeBucket(((RemoveRemoteBucket) message).getAddress());
+        } else if (message instanceof GetAllBuckets) {
+            // GetAllBuckets is used only for unit tests.
+            receiveGetAllBuckets();
+        } else if (message instanceof ConditionalProbe) {
+            // The ConditionalProbe is only used for unit tests.
+            LOG.info("Received probe {} {}", getSelf(), message);
+            probe = (ConditionalProbe) message;
+            // Send back any message to tell the caller we got the probe.
+            getSender().tell("Got it", getSelf());
         } else {
-            log.debug("Unhandled message [{}]", message);
+            LOG.debug("Unhandled message [{}]", message);
             unhandled(message);
         }
     }
@@ -145,7 +144,7 @@ public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMe
      *
      * @param members requested members
      */
-    void receiveGetBucketsByMembers(Set<Address> members) {
+    void receiveGetBucketsByMembers(final Set<Address> members) {
         final ActorRef sender = getSender();
         Map<Address, Bucket<T>> buckets = getBucketsByMembers(members);
         sender.tell(new GetBucketsByMembersReply<>(buckets), getSelf());
@@ -157,7 +156,7 @@ public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMe
      * @param members requested members
      * @return buckets for requested members
      */
-    Map<Address, Bucket<T>> getBucketsByMembers(Set<Address> members) {
+    Map<Address, Bucket<T>> getBucketsByMembers(final Set<Address> members) {
         Map<Address, Bucket<T>> buckets = new HashMap<>();
 
         //first add the local bucket if asked
@@ -190,53 +189,77 @@ public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMe
      * @param receivedBuckets buckets sent by remote
      *                        {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
      */
-    void receiveUpdateRemoteBuckets(Map<Address, Bucket<T>> receivedBuckets) {
-        log.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
+    void receiveUpdateRemoteBuckets(final Map<Address, Bucket<T>> receivedBuckets) {
+        LOG.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
         if (receivedBuckets == null || receivedBuckets.isEmpty()) {
-            return; //nothing to do
+            //nothing to do
+            return;
         }
 
-        //Remote cant update self's bucket
-        receivedBuckets.remove(selfAddress);
-
-        for (Map.Entry<Address, Bucket<T>> entry : receivedBuckets.entrySet()) {
-
-            Long localVersion = versions.get(entry.getKey());
-            if (localVersion == null) {
-                localVersion = NO_VERSION;
+        final Map<Address, Bucket<T>> newBuckets = new HashMap<>(receivedBuckets.size());
+        for (Entry<Address, Bucket<T>> entry : receivedBuckets.entrySet()) {
+            if (selfAddress.equals(entry.getKey())) {
+                // Remote cannot update our bucket
+                continue;
             }
 
-            Bucket<T> receivedBucket = entry.getValue();
-
+            final Bucket<T> receivedBucket = entry.getValue();
             if (receivedBucket == null) {
+                LOG.debug("Ignoring null bucket from {}", entry.getKey());
                 continue;
             }
 
-            Long remoteVersion = receivedBucket.getVersion();
-            if (remoteVersion == null) {
-                remoteVersion = NO_VERSION;
+            // update only if remote version is newer
+            final long remoteVersion = receivedBucket.getVersion();
+            final Long localVersion = versions.get(entry.getKey());
+            if (localVersion != null && remoteVersion <= localVersion.longValue()) {
+                LOG.debug("Ignoring down-versioned bucket from {} ({} local {} remote)", entry.getKey(), localVersion,
+                    remoteVersion);
+                continue;
             }
 
-            //update only if remote version is newer
-            if ( remoteVersion.longValue() > localVersion.longValue() ) {
-                remoteBuckets.put(entry.getKey(), receivedBucket);
-                versions.put(entry.getKey(), remoteVersion);
-            }
+            newBuckets.put(entry.getKey(), receivedBucket);
+            remoteBuckets.put(entry.getKey(), receivedBucket);
+            versions.put(entry.getKey(), remoteVersion);
+            LOG.debug("Updating bucket from {} to version {}", entry.getKey(), remoteVersion);
         }
 
-        log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
+        LOG.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
+
+        onBucketsUpdated(newBuckets);
+    }
+
+    private void removeBucket(final Address address) {
+        final Bucket<T> bucket = remoteBuckets.remove(address);
+        if (bucket != null) {
+            onBucketRemoved(address, bucket);
+        }
+    }
 
-        onBucketsUpdated();
+    /**
+     * Callback to subclasses invoked when a bucket is removed.
+     *
+     * @param address Remote address
+     * @param bucket Bucket removed
+     */
+    protected void onBucketRemoved(final Address address, final Bucket<T> bucket) {
+        // Default noop
     }
 
-    protected void onBucketsUpdated() {
+    /**
+     * Callback to subclasses invoked when the set of remote buckets is updated.
+     *
+     * @param newBuckets Map of address to new bucket. Never null, but can be empty.
+     */
+    protected void onBucketsUpdated(final Map<Address, Bucket<T>> newBuckets) {
+        // Default noop
     }
 
     public BucketImpl<T> getLocalBucket() {
         return localBucket;
     }
 
-    protected void updateLocalBucket(T data) {
+    protected void updateLocalBucket(final T data) {
         localBucket.setData(data);
         versions.put(selfAddress, localBucket.getVersion());
     }