From 00634259fd13ebc57f16ad63340e6472a2b6c6f2 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Wed, 18 Jan 2017 00:09:34 +0100 Subject: [PATCH] BUG-7573: add BucketStore source monitoring Add BucketData interface capture, which exposes an optional ActorRef. If this reference is given for a Bucket's data, the bucket will be tied to the source actor's lifecycle via DeathWatch. If such an actor is not provided, only basic cluster-level monitoring will be done. Change-Id: I794bbf9b360d0c3bf68b29e6869a4f5c7c0d2470 Signed-off-by: Robert Varga --- .../remote/rpc/registry/RoutingTable.java | 10 ++- .../remote/rpc/registry/gossip/Bucket.java | 9 ++- .../rpc/registry/gossip/BucketData.java | 28 +++++++ .../rpc/registry/gossip/BucketImpl.java | 2 +- .../rpc/registry/gossip/BucketStore.java | 74 ++++++++++++++++--- .../remote/rpc/registry/gossip/Gossiper.java | 4 +- .../remote/rpc/registry/gossip/Messages.java | 10 +-- .../rpc/registry/gossip/BucketStoreTest.java | 9 ++- 8 files changed, 123 insertions(+), 23 deletions(-) create mode 100644 opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketData.java diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java index 0c68b55aa3..90b069e587 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java @@ -12,12 +12,13 @@ import com.google.common.base.Preconditions; import java.io.Serializable; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.Set; -import org.opendaylight.controller.remote.rpc.registry.gossip.Copier; +import org.opendaylight.controller.remote.rpc.registry.gossip.BucketData; import org.opendaylight.controller.sal.connector.api.RpcRouter; import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier; -public class RoutingTable implements Copier, Serializable { +public class RoutingTable implements BucketData, Serializable { private static final long serialVersionUID = 5592610415175278760L; private final Map, Long> table; @@ -37,6 +38,11 @@ public class RoutingTable implements Copier, Serializable { return new RoutingTable(router, new HashMap<>(table)); } + @Override + public Optional getWatchActor() { + return Optional.of(router); + } + public Set> getRoutes() { return table.keySet(); } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Bucket.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Bucket.java index c35316a35f..4e3e5519fb 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Bucket.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Bucket.java @@ -7,8 +7,15 @@ */ package org.opendaylight.controller.remote.rpc.registry.gossip; -public interface Bucket> { +import akka.actor.ActorRef; +import java.util.Optional; + +public interface Bucket> { long getVersion(); T getData(); + + default Optional getWatchActor() { + return getData().getWatchActor(); + } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketData.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketData.java new file mode 100644 index 0000000000..6da3d947c6 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketData.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.remote.rpc.registry.gossip; + +import akka.actor.ActorRef; +import java.util.Optional; + +/** + * Marker interface for data which is able to be held in a {@link Bucket}. + * + * @author Robert Varga + * + * @param Concrete BucketData type + */ +public interface BucketData> extends Copier { + /** + * Return the {@link ActorRef} which should be tracked as the authoritative source of this bucket's data. + * The bucket will be invalidated should the actor be reported as Terminated. + * + * @return Optional ActorRef. + */ + Optional getWatchActor(); +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java index 0031962f82..a927c93533 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java @@ -9,7 +9,7 @@ package org.opendaylight.controller.remote.rpc.registry.gossip; import java.io.Serializable; -public final class BucketImpl> implements Bucket, Serializable { +public final class BucketImpl> implements Bucket, Serializable { private static final long serialVersionUID = 294779770032719196L; private Long version = System.currentTimeMillis(); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java index 7ae091d199..b4af0adfe5 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java @@ -11,11 +11,15 @@ package org.opendaylight.controller.remote.rpc.registry.gossip; import akka.actor.ActorRef; import akka.actor.ActorRefProvider; import akka.actor.Address; +import akka.actor.Terminated; import akka.cluster.ClusterActorRefProvider; import com.google.common.base.Preconditions; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.SetMultimap; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; @@ -39,7 +43,7 @@ import org.opendaylight.controller.utils.ConditionalProbe; * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}. * */ -public class BucketStore> extends AbstractUntypedActorWithMetering { +public class BucketStore> extends AbstractUntypedActorWithMetering { /** * Bucket owned by the node. */ @@ -55,6 +59,12 @@ public class BucketStore> extends AbstractUntypedActorWithMe */ private final Map versions = new HashMap<>(); + /** + * {@link ActorRef}s being watched for liveness due to being referenced in bucket data. Each actor is monitored + * once, possibly being tied to multiple addresses (and by extension, buckets). + */ + private final SetMultimap watchedActors = HashMultimap.create(1, 1); + /** * Cluster address for this node. */ @@ -95,6 +105,8 @@ public class BucketStore> extends AbstractUntypedActorWithMe receiveUpdateRemoteBuckets(((UpdateRemoteBuckets) message).getBuckets()); } else if (message instanceof RemoveRemoteBucket) { removeBucket(((RemoveRemoteBucket) message).getAddress()); + } else if (message instanceof Terminated) { + actorTerminated((Terminated) message); } else if (message instanceof GetAllBuckets) { // GetAllBuckets is used only for unit tests. receiveGetAllBuckets(); @@ -198,29 +210,39 @@ public class BucketStore> extends AbstractUntypedActorWithMe final Map> newBuckets = new HashMap<>(receivedBuckets.size()); for (Entry> entry : receivedBuckets.entrySet()) { - if (selfAddress.equals(entry.getKey())) { + final Address addr = entry.getKey(); + + if (selfAddress.equals(addr)) { // Remote cannot update our bucket continue; } final Bucket receivedBucket = entry.getValue(); if (receivedBucket == null) { - LOG.debug("Ignoring null bucket from {}", entry.getKey()); + LOG.debug("Ignoring null bucket from {}", addr); continue; } // update only if remote version is newer final long remoteVersion = receivedBucket.getVersion(); - final Long localVersion = versions.get(entry.getKey()); + final Long localVersion = versions.get(addr); if (localVersion != null && remoteVersion <= localVersion.longValue()) { - LOG.debug("Ignoring down-versioned bucket from {} ({} local {} remote)", entry.getKey(), localVersion, + LOG.debug("Ignoring down-versioned bucket from {} ({} local {} remote)", addr, localVersion, remoteVersion); continue; } + newBuckets.put(addr, receivedBucket); + versions.put(addr, remoteVersion); + final Bucket prevBucket = remoteBuckets.put(addr, receivedBucket); + + // Deal with DeathWatch subscriptions + final Optional prevRef = prevBucket != null ? prevBucket.getWatchActor() : Optional.empty(); + final Optional curRef = receivedBucket.getWatchActor(); + if (!curRef.equals(prevRef)) { + prevRef.ifPresent(ref -> removeWatch(addr, ref)); + curRef.ifPresent(ref -> addWatch(addr, ref)); + } - newBuckets.put(entry.getKey(), receivedBucket); - remoteBuckets.put(entry.getKey(), receivedBucket); - versions.put(entry.getKey(), remoteVersion); LOG.debug("Updating bucket from {} to version {}", entry.getKey(), remoteVersion); } @@ -229,10 +251,40 @@ public class BucketStore> extends AbstractUntypedActorWithMe onBucketsUpdated(newBuckets); } - private void removeBucket(final Address address) { - final Bucket bucket = remoteBuckets.remove(address); + private void addWatch(final Address addr, final ActorRef ref) { + if (!watchedActors.containsKey(ref)) { + getContext().watch(ref); + LOG.debug("Watching {}", ref); + } + watchedActors.put(ref, addr); + } + + private void removeWatch(final Address addr, final ActorRef ref) { + watchedActors.remove(ref, addr); + if (!watchedActors.containsKey(ref)) { + getContext().unwatch(ref); + LOG.debug("No longer watching {}", ref); + } + } + + private void removeBucket(final Address addr) { + final Bucket bucket = remoteBuckets.remove(addr); if (bucket != null) { - onBucketRemoved(address, bucket); + bucket.getWatchActor().ifPresent(ref -> removeWatch(addr, ref)); + onBucketRemoved(addr, bucket); + } + } + + private void actorTerminated(final Terminated message) { + LOG.info("Actor termination {} received", message); + + for (Address addr : watchedActors.removeAll(message.getActor())) { + versions.remove(addr); + final Bucket bucket = remoteBuckets.remove(addr); + if (bucket != null) { + LOG.debug("Source actor dead, removing bucket {} from ", bucket, addr); + onBucketRemoved(addr, bucket); + } } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java index 33b3f6e813..2c47c4e2a9 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java @@ -275,7 +275,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering { * @param envelope contains buckets from a remote gossiper */ @VisibleForTesting - > void receiveGossip(final GossipEnvelope envelope) { + > void receiveGossip(final GossipEnvelope envelope) { //TODO: Add more validations if (!selfAddress.equals(envelope.to())) { LOG.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to()); @@ -291,7 +291,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering { * @param buckets map of Buckets to update */ @VisibleForTesting - > void updateRemoteBuckets(final Map> buckets) { + > void updateRemoteBuckets(final Map> buckets) { getContext().parent().tell(new UpdateRemoteBuckets<>(buckets), getSelf()); } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java index cc64ba4509..6f81fe8b32 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java @@ -43,7 +43,7 @@ public class Messages { } } - public static class ContainsBuckets> implements Serializable { + public static class ContainsBuckets> implements Serializable { private static final long serialVersionUID = -4940160367495308286L; private final Map> buckets; @@ -58,7 +58,7 @@ public class Messages { } } - public static final class GetAllBucketsReply> extends ContainsBuckets { + public static final class GetAllBucketsReply> extends ContainsBuckets { private static final long serialVersionUID = 1L; public GetAllBucketsReply(final Map> buckets) { @@ -66,7 +66,7 @@ public class Messages { } } - public static final class GetBucketsByMembersReply> extends ContainsBuckets { + public static final class GetBucketsByMembersReply> extends ContainsBuckets { private static final long serialVersionUID = 1L; public GetBucketsByMembersReply(final Map> buckets) { @@ -102,7 +102,7 @@ public class Messages { } } - public static final class UpdateRemoteBuckets> extends ContainsBuckets { + public static final class UpdateRemoteBuckets> extends ContainsBuckets { private static final long serialVersionUID = 1L; public UpdateRemoteBuckets(final Map> buckets) { @@ -151,7 +151,7 @@ public class Messages { } } - public static final class GossipEnvelope> extends ContainsBuckets { + public static final class GossipEnvelope> extends ContainsBuckets { private static final long serialVersionUID = 8346634072582438818L; private final Address from; diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java index ff7365cbac..a5b0fbedda 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreTest.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.remote.rpc.registry.gossip; +import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.Props; @@ -15,6 +16,7 @@ import akka.testkit.TestActorRef; import com.typesafe.config.ConfigFactory; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -30,11 +32,16 @@ public class BucketStoreTest { * @author gwu * */ - private static class T implements Copier { + private static class T implements BucketData { @Override public T copy() { return new T(); } + + @Override + public Optional getWatchActor() { + return Optional.empty(); + } } private static ActorSystem system; -- 2.36.6