BUG-7573: add BucketStore source monitoring
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / BucketStore.java
index 3de3fc00d0328215ef8f127e8272458f14326708..b4af0adfe5f4160868752aab7e0210a8607c3910 100644 (file)
@@ -11,149 +11,139 @@ package org.opendaylight.controller.remote.rpc.registry.gossip;
 import akka.actor.ActorRef;
 import akka.actor.ActorRefProvider;
 import akka.actor.Address;
-import akka.actor.Props;
+import akka.actor.Terminated;
 import akka.cluster.ClusterActorRefProvider;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
-import org.opendaylight.controller.utils.ConditionalProbe;
-
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
+import org.opendaylight.controller.utils.ConditionalProbe;
 
 /**
  * A store that syncs its data across nodes in the cluster.
  * It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned.
  * A node can write ONLY to its bucket. This way, write conflicts are avoided.
+ *
  * <p>
- * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol)<p>
+ * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol).
  * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
  *
  */
-public class BucketStore extends AbstractUntypedActorWithMetering {
-
-    final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+public class BucketStore<T extends BucketData<T>> extends AbstractUntypedActorWithMetering {
+    /**
+     * Bucket owned by the node.
+     */
+    private final BucketImpl<T> localBucket;
 
     /**
-     * Bucket owned by the node
+     * Buckets owned by other known nodes in the cluster.
      */
-    private BucketImpl localBucket = new BucketImpl();
+    private final Map<Address, Bucket<T>> remoteBuckets = new HashMap<>();
 
     /**
-     * Buckets ownded by other known nodes in the cluster
+     * Bucket version for every known node in the cluster including this node.
      */
-    private ConcurrentMap<Address, Bucket> remoteBuckets = new ConcurrentHashMap<>();
+    private final Map<Address, Long> versions = new HashMap<>();
 
     /**
-     * Bucket version for every known node in the cluster including this node
+     * {@link ActorRef}s being watched for liveness due to being referenced in bucket data. Each actor is monitored
+     * once, possibly being tied to multiple addresses (and by extension, buckets).
      */
-    private ConcurrentMap<Address, Long> versions = new ConcurrentHashMap<>();
+    private final SetMultimap<ActorRef, Address> watchedActors = HashMultimap.create(1, 1);
 
     /**
-     * Cluster address for this node
+     * Cluster address for this node.
      */
     private Address selfAddress;
 
+    // FIXME: should be part of test-specific subclass
     private ConditionalProbe probe;
 
     private final RemoteRpcProviderConfig config;
 
-    public BucketStore(){
-        config = new RemoteRpcProviderConfig(getContext().system().settings().config());
+    public BucketStore(final RemoteRpcProviderConfig config, final T initialData) {
+        this.config = Preconditions.checkNotNull(config);
+        this.localBucket = new BucketImpl<>(initialData);
     }
 
     @Override
-    public void preStart(){
+    public void preStart() {
         ActorRefProvider provider = getContext().provider();
         selfAddress = provider.getDefaultAddress();
 
-        if ( provider instanceof ClusterActorRefProvider)
-            getContext().actorOf(Props.create(Gossiper.class).withMailbox(config.getMailBoxName()), "gossiper");
+        if (provider instanceof ClusterActorRefProvider) {
+            getContext().actorOf(Gossiper.props(config).withMailbox(config.getMailBoxName()), "gossiper");
+        }
     }
 
-
+    @SuppressWarnings("unchecked")
     @Override
-    protected void handleReceive(Object message) throws Exception {
+    protected void handleReceive(final Object message) throws Exception {
         if (probe != null) {
             probe.tell(message, getSelf());
         }
 
-        if (message instanceof ConditionalProbe) {
-            log.info("Received probe {} {}", getSelf(), message);
-            probe = (ConditionalProbe) message;
-        } else if (message instanceof UpdateBucket) {
-            receiveUpdateBucket(((UpdateBucket) message).getBucket());
-        } else if (message instanceof GetAllBuckets) {
-            receiveGetAllBucket();
-        } else if (message instanceof GetLocalBucket) {
-            receiveGetLocalBucket();
-        } else if (message instanceof GetBucketsByMembers) {
-            receiveGetBucketsByMembers(
-                    ((GetBucketsByMembers) message).getMembers());
+        if (message instanceof GetBucketsByMembers) {
+            receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
         } else if (message instanceof GetBucketVersions) {
             receiveGetBucketVersions();
         } else if (message instanceof UpdateRemoteBuckets) {
-            receiveUpdateRemoteBuckets(
-                    ((UpdateRemoteBuckets) message).getBuckets());
+            receiveUpdateRemoteBuckets(((UpdateRemoteBuckets<T>) message).getBuckets());
+        } else if (message instanceof RemoveRemoteBucket) {
+            removeBucket(((RemoveRemoteBucket) message).getAddress());
+        } else if (message instanceof Terminated) {
+            actorTerminated((Terminated) message);
+        } else if (message instanceof GetAllBuckets) {
+            // GetAllBuckets is used only for unit tests.
+            receiveGetAllBuckets();
+        } else if (message instanceof ConditionalProbe) {
+            // The ConditionalProbe is only used for unit tests.
+            LOG.info("Received probe {} {}", getSelf(), message);
+            probe = (ConditionalProbe) message;
+            // Send back any message to tell the caller we got the probe.
+            getSender().tell("Got it", getSelf());
         } else {
-            log.debug("Unhandled message [{}]", message);
+            LOG.debug("Unhandled message [{}]", message);
             unhandled(message);
         }
     }
 
-    /**
-     * Returns a copy of bucket owned by this node
-     */
-    private void receiveGetLocalBucket() {
-        final ActorRef sender = getSender();
-        GetLocalBucketReply reply = new GetLocalBucketReply(localBucket);
-        sender.tell(reply, getSelf());
+    protected RemoteRpcProviderConfig getConfig() {
+        return config;
     }
 
     /**
-     * Updates the bucket owned by this node
-     *
-     * @param updatedBucket
+     * Returns all the buckets the this node knows about, self owned + remote.
      */
-    void receiveUpdateBucket(Bucket updatedBucket){
-
-        localBucket = (BucketImpl) updatedBucket;
-        versions.put(selfAddress, localBucket.getVersion());
-    }
-
-    /**
-     * Returns all the buckets the this node knows about, self owned + remote
-     */
-    void receiveGetAllBucket(){
+    void receiveGetAllBuckets() {
         final ActorRef sender = getSender();
-        sender.tell(new GetAllBucketsReply(getAllBuckets()), getSelf());
+        sender.tell(new GetAllBucketsReply<>(getAllBuckets()), getSelf());
     }
 
     /**
-     * Helper to collect all known buckets
+     * Helper to collect all known buckets.
      *
      * @return self owned + remote buckets
      */
-    Map<Address, Bucket> getAllBuckets(){
-        Map<Address, Bucket> all = new HashMap<>(remoteBuckets.size() + 1);
+    Map<Address, Bucket<T>> getAllBuckets() {
+        Map<Address, Bucket<T>> all = new HashMap<>(remoteBuckets.size() + 1);
 
         //first add the local bucket
-        all.put(selfAddress, localBucket);
+        all.put(selfAddress, new BucketImpl<>(localBucket));
 
         //then get all remote buckets
         all.putAll(remoteBuckets);
@@ -162,113 +152,175 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
     }
 
     /**
-     * Returns buckets for requested members that this node knows about
+     * Returns buckets for requested members that this node knows about.
      *
      * @param members requested members
      */
-    void receiveGetBucketsByMembers(Set<Address> members){
+    void receiveGetBucketsByMembers(final Set<Address> members) {
         final ActorRef sender = getSender();
-        Map<Address, Bucket> buckets = getBucketsByMembers(members);
-        sender.tell(new GetBucketsByMembersReply(buckets), getSelf());
+        Map<Address, Bucket<T>> buckets = getBucketsByMembers(members);
+        sender.tell(new GetBucketsByMembersReply<>(buckets), getSelf());
     }
 
     /**
-     * Helper to collect buckets for requested memebers
+     * Helper to collect buckets for requested members.
      *
      * @param members requested members
-     * @return buckets for requested memebers
+     * @return buckets for requested members
      */
-    Map<Address, Bucket> getBucketsByMembers(Set<Address> members) {
-        Map<Address, Bucket> buckets = new HashMap<>();
+    Map<Address, Bucket<T>> getBucketsByMembers(final Set<Address> members) {
+        Map<Address, Bucket<T>> buckets = new HashMap<>();
 
         //first add the local bucket if asked
-        if (members.contains(selfAddress))
-            buckets.put(selfAddress, localBucket);
+        if (members.contains(selfAddress)) {
+            buckets.put(selfAddress, new BucketImpl<>(localBucket));
+        }
 
         //then get buckets for requested remote nodes
-        for (Address address : members){
-            if (remoteBuckets.containsKey(address))
+        for (Address address : members) {
+            if (remoteBuckets.containsKey(address)) {
                 buckets.put(address, remoteBuckets.get(address));
+            }
         }
 
         return buckets;
     }
 
     /**
-     * Returns versions for all buckets known
+     * Returns versions for all buckets known.
      */
-    void receiveGetBucketVersions(){
+    void receiveGetBucketVersions() {
         final ActorRef sender = getSender();
         GetBucketVersionsReply reply = new GetBucketVersionsReply(versions);
         sender.tell(reply, getSelf());
     }
 
     /**
-     * Update local copy of remote buckets where local copy's version is older
+     * Update local copy of remote buckets where local copy's version is older.
      *
      * @param receivedBuckets buckets sent by remote
      *                        {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
      */
-    void receiveUpdateRemoteBuckets(Map<Address, Bucket> receivedBuckets){
+    void receiveUpdateRemoteBuckets(final Map<Address, Bucket<T>> receivedBuckets) {
+        LOG.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
+        if (receivedBuckets == null || receivedBuckets.isEmpty()) {
+            //nothing to do
+            return;
+        }
 
-        if (receivedBuckets == null || receivedBuckets.isEmpty())
-            return; //nothing to do
+        final Map<Address, Bucket<T>> newBuckets = new HashMap<>(receivedBuckets.size());
+        for (Entry<Address, Bucket<T>> entry : receivedBuckets.entrySet()) {
+            final Address addr = entry.getKey();
 
-        //Remote cant update self's bucket
-        receivedBuckets.remove(selfAddress);
+            if (selfAddress.equals(addr)) {
+                // Remote cannot update our bucket
+                continue;
+            }
 
-        for (Map.Entry<Address, Bucket> entry : receivedBuckets.entrySet()){
+            final Bucket<T> receivedBucket = entry.getValue();
+            if (receivedBucket == null) {
+                LOG.debug("Ignoring null bucket from {}", addr);
+                continue;
+            }
 
-            Long localVersion = versions.get(entry.getKey());
-            if (localVersion == null) localVersion = -1L;
+            // update only if remote version is newer
+            final long remoteVersion = receivedBucket.getVersion();
+            final Long localVersion = versions.get(addr);
+            if (localVersion != null && remoteVersion <= localVersion.longValue()) {
+                LOG.debug("Ignoring down-versioned bucket from {} ({} local {} remote)", addr, localVersion,
+                    remoteVersion);
+                continue;
+            }
+            newBuckets.put(addr, receivedBucket);
+            versions.put(addr, remoteVersion);
+            final Bucket<T> prevBucket = remoteBuckets.put(addr, receivedBucket);
+
+            // Deal with DeathWatch subscriptions
+            final Optional<ActorRef> prevRef = prevBucket != null ? prevBucket.getWatchActor() : Optional.empty();
+            final Optional<ActorRef> curRef = receivedBucket.getWatchActor();
+            if (!curRef.equals(prevRef)) {
+                prevRef.ifPresent(ref -> removeWatch(addr, ref));
+                curRef.ifPresent(ref -> addWatch(addr, ref));
+            }
 
-            Bucket receivedBucket = entry.getValue();
+            LOG.debug("Updating bucket from {} to version {}", entry.getKey(), remoteVersion);
+        }
 
-            if (receivedBucket == null)
-                continue;
+        LOG.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
 
-            Long remoteVersion = receivedBucket.getVersion();
-            if (remoteVersion == null) remoteVersion = -1L;
+        onBucketsUpdated(newBuckets);
+    }
 
-            //update only if remote version is newer
-            if ( remoteVersion.longValue() > localVersion.longValue() ) {
-                remoteBuckets.put(entry.getKey(), receivedBucket);
-                versions.put(entry.getKey(), remoteVersion);
-            }
+    private void addWatch(final Address addr, final ActorRef ref) {
+        if (!watchedActors.containsKey(ref)) {
+            getContext().watch(ref);
+            LOG.debug("Watching {}", ref);
         }
+        watchedActors.put(ref, addr);
+    }
 
-        log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
+    private void removeWatch(final Address addr, final ActorRef ref) {
+        watchedActors.remove(ref, addr);
+        if (!watchedActors.containsKey(ref)) {
+            getContext().unwatch(ref);
+            LOG.debug("No longer watching {}", ref);
+        }
     }
 
-    ///
-    ///Getter Setters
-    ///
+    private void removeBucket(final Address addr) {
+        final Bucket<T> bucket = remoteBuckets.remove(addr);
+        if (bucket != null) {
+            bucket.getWatchActor().ifPresent(ref -> removeWatch(addr, ref));
+            onBucketRemoved(addr, bucket);
+        }
+    }
 
-    BucketImpl getLocalBucket() {
-        return localBucket;
+    private void actorTerminated(final Terminated message) {
+        LOG.info("Actor termination {} received", message);
+
+        for (Address addr : watchedActors.removeAll(message.getActor())) {
+            versions.remove(addr);
+            final Bucket<T> bucket = remoteBuckets.remove(addr);
+            if (bucket != null) {
+                LOG.debug("Source actor dead, removing bucket {} from ", bucket, addr);
+                onBucketRemoved(addr, bucket);
+            }
+        }
     }
 
-    void setLocalBucket(BucketImpl localBucket) {
-        this.localBucket = localBucket;
+    /**
+     * Callback to subclasses invoked when a bucket is removed.
+     *
+     * @param address Remote address
+     * @param bucket Bucket removed
+     */
+    protected void onBucketRemoved(final Address address, final Bucket<T> bucket) {
+        // Default noop
     }
 
-    ConcurrentMap<Address, Bucket> getRemoteBuckets() {
-        return remoteBuckets;
+    /**
+     * Callback to subclasses invoked when the set of remote buckets is updated.
+     *
+     * @param newBuckets Map of address to new bucket. Never null, but can be empty.
+     */
+    protected void onBucketsUpdated(final Map<Address, Bucket<T>> newBuckets) {
+        // Default noop
     }
 
-    void setRemoteBuckets(ConcurrentMap<Address, Bucket> remoteBuckets) {
-        this.remoteBuckets = remoteBuckets;
+    public BucketImpl<T> getLocalBucket() {
+        return localBucket;
     }
 
-    ConcurrentMap<Address, Long> getVersions() {
-        return versions;
+    protected void updateLocalBucket(final T data) {
+        localBucket.setData(data);
+        versions.put(selfAddress, localBucket.getVersion());
     }
 
-    void setVersions(ConcurrentMap<Address, Long> versions) {
-        this.versions = versions;
+    public Map<Address, Bucket<T>> getRemoteBuckets() {
+        return remoteBuckets;
     }
 
-    Address getSelfAddress() {
-        return selfAddress;
+    public Map<Address, Long> getVersions() {
+        return versions;
     }
 }