BUG-7573: add BucketStore source monitoring 85/50585/12
authorRobert Varga <rovarga@cisco.com>
Tue, 17 Jan 2017 23:09:34 +0000 (00:09 +0100)
committerRobert Varga <rovarga@cisco.com>
Mon, 30 Jan 2017 13:52:44 +0000 (14:52 +0100)
Add BucketData interface capture, which exposes an optional ActorRef.
If this reference is given for a Bucket's data, the bucket will be tied
to the source actor's lifecycle via DeathWatch.

If such an actor is not provided, only basic cluster-level monitoring
will be done.

Change-Id: I794bbf9b360d0c3bf68b29e6869a4f5c7c0d2470
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Bucket.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketData.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java

index 0c68b55..90b069e 100644 (file)
@@ -12,12 +12,13 @@ import com.google.common.base.Preconditions;
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Copier;
+import org.opendaylight.controller.remote.rpc.registry.gossip.BucketData;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
 
-public class RoutingTable implements Copier<RoutingTable>, Serializable {
+public class RoutingTable implements BucketData<RoutingTable>, Serializable {
     private static final long serialVersionUID = 5592610415175278760L;
 
     private final Map<RouteIdentifier<?, ?, ?>, Long> table;
@@ -37,6 +38,11 @@ public class RoutingTable implements Copier<RoutingTable>, Serializable {
         return new RoutingTable(router, new HashMap<>(table));
     }
 
+    @Override
+    public Optional<ActorRef> getWatchActor() {
+        return Optional.of(router);
+    }
+
     public Set<RpcRouter.RouteIdentifier<?, ?, ?>> getRoutes() {
         return table.keySet();
     }
index c35316a..4e3e551 100644 (file)
@@ -7,8 +7,15 @@
  */
 package org.opendaylight.controller.remote.rpc.registry.gossip;
 
-public interface Bucket<T extends Copier<T>> {
+import akka.actor.ActorRef;
+import java.util.Optional;
+
+public interface Bucket<T extends BucketData<T>> {
     long getVersion();
 
     T getData();
+
+    default Optional<ActorRef> getWatchActor() {
+        return getData().getWatchActor();
+    }
 }
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketData.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketData.java
new file mode 100644 (file)
index 0000000..6da3d94
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.remote.rpc.registry.gossip;
+
+import akka.actor.ActorRef;
+import java.util.Optional;
+
+/**
+ * Marker interface for data which is able to be held in a {@link Bucket}.
+ *
+ * @author Robert Varga
+ *
+ * @param <T> Concrete BucketData type
+ */
+public interface BucketData<T extends BucketData<T>> extends Copier<T> {
+    /**
+     * Return the {@link ActorRef} which should be tracked as the authoritative source of this bucket's data.
+     * The bucket will be invalidated should the actor be reported as Terminated.
+     *
+     * @return Optional ActorRef.
+     */
+    Optional<ActorRef> getWatchActor();
+}
index 0031962..a927c93 100644 (file)
@@ -9,7 +9,7 @@ package org.opendaylight.controller.remote.rpc.registry.gossip;
 
 import java.io.Serializable;
 
-public final class BucketImpl<T extends Copier<T>> implements Bucket<T>, Serializable {
+public final class BucketImpl<T extends BucketData<T>> implements Bucket<T>, Serializable {
     private static final long serialVersionUID = 294779770032719196L;
 
     private Long version = System.currentTimeMillis();
index 7ae091d..b4af0ad 100644 (file)
@@ -11,11 +11,15 @@ package org.opendaylight.controller.remote.rpc.registry.gossip;
 import akka.actor.ActorRef;
 import akka.actor.ActorRefProvider;
 import akka.actor.Address;
+import akka.actor.Terminated;
 import akka.cluster.ClusterActorRefProvider;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
@@ -39,7 +43,7 @@ import org.opendaylight.controller.utils.ConditionalProbe;
  * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
  *
  */
-public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMetering {
+public class BucketStore<T extends BucketData<T>> extends AbstractUntypedActorWithMetering {
     /**
      * Bucket owned by the node.
      */
@@ -55,6 +59,12 @@ public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMe
      */
     private final Map<Address, Long> versions = new HashMap<>();
 
+    /**
+     * {@link ActorRef}s being watched for liveness due to being referenced in bucket data. Each actor is monitored
+     * once, possibly being tied to multiple addresses (and by extension, buckets).
+     */
+    private final SetMultimap<ActorRef, Address> watchedActors = HashMultimap.create(1, 1);
+
     /**
      * Cluster address for this node.
      */
@@ -95,6 +105,8 @@ public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMe
             receiveUpdateRemoteBuckets(((UpdateRemoteBuckets<T>) message).getBuckets());
         } else if (message instanceof RemoveRemoteBucket) {
             removeBucket(((RemoveRemoteBucket) message).getAddress());
+        } else if (message instanceof Terminated) {
+            actorTerminated((Terminated) message);
         } else if (message instanceof GetAllBuckets) {
             // GetAllBuckets is used only for unit tests.
             receiveGetAllBuckets();
@@ -198,29 +210,39 @@ public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMe
 
         final Map<Address, Bucket<T>> newBuckets = new HashMap<>(receivedBuckets.size());
         for (Entry<Address, Bucket<T>> entry : receivedBuckets.entrySet()) {
-            if (selfAddress.equals(entry.getKey())) {
+            final Address addr = entry.getKey();
+
+            if (selfAddress.equals(addr)) {
                 // Remote cannot update our bucket
                 continue;
             }
 
             final Bucket<T> receivedBucket = entry.getValue();
             if (receivedBucket == null) {
-                LOG.debug("Ignoring null bucket from {}", entry.getKey());
+                LOG.debug("Ignoring null bucket from {}", addr);
                 continue;
             }
 
             // update only if remote version is newer
             final long remoteVersion = receivedBucket.getVersion();
-            final Long localVersion = versions.get(entry.getKey());
+            final Long localVersion = versions.get(addr);
             if (localVersion != null && remoteVersion <= localVersion.longValue()) {
-                LOG.debug("Ignoring down-versioned bucket from {} ({} local {} remote)", entry.getKey(), localVersion,
+                LOG.debug("Ignoring down-versioned bucket from {} ({} local {} remote)", addr, localVersion,
                     remoteVersion);
                 continue;
             }
+            newBuckets.put(addr, receivedBucket);
+            versions.put(addr, remoteVersion);
+            final Bucket<T> prevBucket = remoteBuckets.put(addr, receivedBucket);
+
+            // Deal with DeathWatch subscriptions
+            final Optional<ActorRef> prevRef = prevBucket != null ? prevBucket.getWatchActor() : Optional.empty();
+            final Optional<ActorRef> curRef = receivedBucket.getWatchActor();
+            if (!curRef.equals(prevRef)) {
+                prevRef.ifPresent(ref -> removeWatch(addr, ref));
+                curRef.ifPresent(ref -> addWatch(addr, ref));
+            }
 
-            newBuckets.put(entry.getKey(), receivedBucket);
-            remoteBuckets.put(entry.getKey(), receivedBucket);
-            versions.put(entry.getKey(), remoteVersion);
             LOG.debug("Updating bucket from {} to version {}", entry.getKey(), remoteVersion);
         }
 
@@ -229,10 +251,40 @@ public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMe
         onBucketsUpdated(newBuckets);
     }
 
-    private void removeBucket(final Address address) {
-        final Bucket<T> bucket = remoteBuckets.remove(address);
+    private void addWatch(final Address addr, final ActorRef ref) {
+        if (!watchedActors.containsKey(ref)) {
+            getContext().watch(ref);
+            LOG.debug("Watching {}", ref);
+        }
+        watchedActors.put(ref, addr);
+    }
+
+    private void removeWatch(final Address addr, final ActorRef ref) {
+        watchedActors.remove(ref, addr);
+        if (!watchedActors.containsKey(ref)) {
+            getContext().unwatch(ref);
+            LOG.debug("No longer watching {}", ref);
+        }
+    }
+
+    private void removeBucket(final Address addr) {
+        final Bucket<T> bucket = remoteBuckets.remove(addr);
         if (bucket != null) {
-            onBucketRemoved(address, bucket);
+            bucket.getWatchActor().ifPresent(ref -> removeWatch(addr, ref));
+            onBucketRemoved(addr, bucket);
+        }
+    }
+
+    private void actorTerminated(final Terminated message) {
+        LOG.info("Actor termination {} received", message);
+
+        for (Address addr : watchedActors.removeAll(message.getActor())) {
+            versions.remove(addr);
+            final Bucket<T> bucket = remoteBuckets.remove(addr);
+            if (bucket != null) {
+                LOG.debug("Source actor dead, removing bucket {} from ", bucket, addr);
+                onBucketRemoved(addr, bucket);
+            }
         }
     }
 
index 33b3f6e..2c47c4e 100644 (file)
@@ -275,7 +275,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      * @param envelope contains buckets from a remote gossiper
      */
     @VisibleForTesting
-    <T extends Copier<T>> void receiveGossip(final GossipEnvelope<T> envelope) {
+    <T extends BucketData<T>> void receiveGossip(final GossipEnvelope<T> envelope) {
         //TODO: Add more validations
         if (!selfAddress.equals(envelope.to())) {
             LOG.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
@@ -291,7 +291,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      * @param buckets map of Buckets to update
      */
     @VisibleForTesting
-    <T extends Copier<T>> void updateRemoteBuckets(final Map<Address, Bucket<T>> buckets) {
+    <T extends BucketData<T>> void updateRemoteBuckets(final Map<Address, Bucket<T>> buckets) {
         getContext().parent().tell(new UpdateRemoteBuckets<>(buckets), getSelf());
     }
 
index cc64ba4..6f81fe8 100644 (file)
@@ -43,7 +43,7 @@ public class Messages {
             }
         }
 
-        public static class ContainsBuckets<T extends Copier<T>> implements Serializable {
+        public static class ContainsBuckets<T extends BucketData<T>> implements Serializable {
             private static final long serialVersionUID = -4940160367495308286L;
 
             private final Map<Address, Bucket<T>> buckets;
@@ -58,7 +58,7 @@ public class Messages {
             }
         }
 
-        public static final class GetAllBucketsReply<T extends Copier<T>> extends ContainsBuckets<T> {
+        public static final class GetAllBucketsReply<T extends BucketData<T>> extends ContainsBuckets<T> {
             private static final long serialVersionUID = 1L;
 
             public GetAllBucketsReply(final Map<Address, Bucket<T>> buckets) {
@@ -66,7 +66,7 @@ public class Messages {
             }
         }
 
-        public static final class GetBucketsByMembersReply<T extends Copier<T>> extends ContainsBuckets<T>  {
+        public static final class GetBucketsByMembersReply<T extends BucketData<T>> extends ContainsBuckets<T>  {
             private static final long serialVersionUID = 1L;
 
             public GetBucketsByMembersReply(final Map<Address, Bucket<T>> buckets) {
@@ -102,7 +102,7 @@ public class Messages {
             }
         }
 
-        public static final class UpdateRemoteBuckets<T extends Copier<T>> extends ContainsBuckets<T> {
+        public static final class UpdateRemoteBuckets<T extends BucketData<T>> extends ContainsBuckets<T> {
             private static final long serialVersionUID = 1L;
 
             public UpdateRemoteBuckets(final Map<Address, Bucket<T>> buckets) {
@@ -151,7 +151,7 @@ public class Messages {
             }
         }
 
-        public static final class GossipEnvelope<T extends Copier<T>> extends ContainsBuckets<T> {
+        public static final class GossipEnvelope<T extends BucketData<T>> extends ContainsBuckets<T> {
             private static final long serialVersionUID = 8346634072582438818L;
 
             private final Address from;
index ff7365c..a5b0fbe 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.remote.rpc.registry.gossip;
 
+import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.Props;
@@ -15,6 +16,7 @@ import akka.testkit.TestActorRef;
 import com.typesafe.config.ConfigFactory;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -30,11 +32,16 @@ public class BucketStoreTest {
      * @author gwu
      *
      */
-    private static class T implements Copier<T> {
+    private static class T implements BucketData<T> {
         @Override
         public T copy() {
             return new T();
         }
+
+        @Override
+        public Optional<ActorRef> getWatchActor() {
+            return Optional.empty();
+        }
     }
 
     private static ActorSystem system;