BUG-7573: add BucketStore source monitoring
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / BucketStore.java
index 7ae091d19983e0b692e9f654605fc184d8d4b6a6..b4af0adfe5f4160868752aab7e0210a8607c3910 100644 (file)
@@ -11,11 +11,15 @@ package org.opendaylight.controller.remote.rpc.registry.gossip;
 import akka.actor.ActorRef;
 import akka.actor.ActorRefProvider;
 import akka.actor.Address;
+import akka.actor.Terminated;
 import akka.cluster.ClusterActorRefProvider;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
@@ -39,7 +43,7 @@ import org.opendaylight.controller.utils.ConditionalProbe;
  * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
  *
  */
-public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMetering {
+public class BucketStore<T extends BucketData<T>> extends AbstractUntypedActorWithMetering {
     /**
      * Bucket owned by the node.
      */
@@ -55,6 +59,12 @@ public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMe
      */
     private final Map<Address, Long> versions = new HashMap<>();
 
+    /**
+     * {@link ActorRef}s being watched for liveness due to being referenced in bucket data. Each actor is monitored
+     * once, possibly being tied to multiple addresses (and by extension, buckets).
+     */
+    private final SetMultimap<ActorRef, Address> watchedActors = HashMultimap.create(1, 1);
+
     /**
      * Cluster address for this node.
      */
@@ -95,6 +105,8 @@ public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMe
             receiveUpdateRemoteBuckets(((UpdateRemoteBuckets<T>) message).getBuckets());
         } else if (message instanceof RemoveRemoteBucket) {
             removeBucket(((RemoveRemoteBucket) message).getAddress());
+        } else if (message instanceof Terminated) {
+            actorTerminated((Terminated) message);
         } else if (message instanceof GetAllBuckets) {
             // GetAllBuckets is used only for unit tests.
             receiveGetAllBuckets();
@@ -198,29 +210,39 @@ public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMe
 
         final Map<Address, Bucket<T>> newBuckets = new HashMap<>(receivedBuckets.size());
         for (Entry<Address, Bucket<T>> entry : receivedBuckets.entrySet()) {
-            if (selfAddress.equals(entry.getKey())) {
+            final Address addr = entry.getKey();
+
+            if (selfAddress.equals(addr)) {
                 // Remote cannot update our bucket
                 continue;
             }
 
             final Bucket<T> receivedBucket = entry.getValue();
             if (receivedBucket == null) {
-                LOG.debug("Ignoring null bucket from {}", entry.getKey());
+                LOG.debug("Ignoring null bucket from {}", addr);
                 continue;
             }
 
             // update only if remote version is newer
             final long remoteVersion = receivedBucket.getVersion();
-            final Long localVersion = versions.get(entry.getKey());
+            final Long localVersion = versions.get(addr);
             if (localVersion != null && remoteVersion <= localVersion.longValue()) {
-                LOG.debug("Ignoring down-versioned bucket from {} ({} local {} remote)", entry.getKey(), localVersion,
+                LOG.debug("Ignoring down-versioned bucket from {} ({} local {} remote)", addr, localVersion,
                     remoteVersion);
                 continue;
             }
+            newBuckets.put(addr, receivedBucket);
+            versions.put(addr, remoteVersion);
+            final Bucket<T> prevBucket = remoteBuckets.put(addr, receivedBucket);
+
+            // Deal with DeathWatch subscriptions
+            final Optional<ActorRef> prevRef = prevBucket != null ? prevBucket.getWatchActor() : Optional.empty();
+            final Optional<ActorRef> curRef = receivedBucket.getWatchActor();
+            if (!curRef.equals(prevRef)) {
+                prevRef.ifPresent(ref -> removeWatch(addr, ref));
+                curRef.ifPresent(ref -> addWatch(addr, ref));
+            }
 
-            newBuckets.put(entry.getKey(), receivedBucket);
-            remoteBuckets.put(entry.getKey(), receivedBucket);
-            versions.put(entry.getKey(), remoteVersion);
             LOG.debug("Updating bucket from {} to version {}", entry.getKey(), remoteVersion);
         }
 
@@ -229,10 +251,40 @@ public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMe
         onBucketsUpdated(newBuckets);
     }
 
-    private void removeBucket(final Address address) {
-        final Bucket<T> bucket = remoteBuckets.remove(address);
+    private void addWatch(final Address addr, final ActorRef ref) {
+        if (!watchedActors.containsKey(ref)) {
+            getContext().watch(ref);
+            LOG.debug("Watching {}", ref);
+        }
+        watchedActors.put(ref, addr);
+    }
+
+    private void removeWatch(final Address addr, final ActorRef ref) {
+        watchedActors.remove(ref, addr);
+        if (!watchedActors.containsKey(ref)) {
+            getContext().unwatch(ref);
+            LOG.debug("No longer watching {}", ref);
+        }
+    }
+
+    private void removeBucket(final Address addr) {
+        final Bucket<T> bucket = remoteBuckets.remove(addr);
         if (bucket != null) {
-            onBucketRemoved(address, bucket);
+            bucket.getWatchActor().ifPresent(ref -> removeWatch(addr, ref));
+            onBucketRemoved(addr, bucket);
+        }
+    }
+
+    private void actorTerminated(final Terminated message) {
+        LOG.info("Actor termination {} received", message);
+
+        for (Address addr : watchedActors.removeAll(message.getActor())) {
+            versions.remove(addr);
+            final Bucket<T> bucket = remoteBuckets.remove(addr);
+            if (bucket != null) {
+                LOG.debug("Source actor dead, removing bucket {} from ", bucket, addr);
+                onBucketRemoved(addr, bucket);
+            }
         }
     }