import akka.actor.ActorRef;
import akka.actor.ActorRefProvider;
import akka.actor.Address;
+import akka.actor.Terminated;
import akka.cluster.ClusterActorRefProvider;
import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
* This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
*
*/
-public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMetering {
+public class BucketStore<T extends BucketData<T>> extends AbstractUntypedActorWithMetering {
/**
* Bucket owned by the node
*/
private final Map<Address, Long> versions = new HashMap<>();
/**
- * Cluster address for this node
+ * {@link ActorRef}s being watched for liveness due to being referenced in bucket data. Each actor is monitored
+ * once, possibly being tied to multiple addresses (and by extension, buckets).
+ */
+ private final SetMultimap<ActorRef, Address> watchedActors = HashMultimap.create(1, 1);
+
+ /**
+ * Cluster address for this node.
*/
private Address selfAddress;
receiveUpdateRemoteBuckets(((UpdateRemoteBuckets<T>) message).getBuckets());
} else if (message instanceof RemoveRemoteBucket) {
removeBucket(((RemoveRemoteBucket) message).getAddress());
+ } else if (message instanceof Terminated) {
+ actorTerminated((Terminated) message);
} else if (message instanceof GetAllBuckets) {
// GetAllBuckets is used only for unit tests.
receiveGetAllBuckets();
final Map<Address, Bucket<T>> newBuckets = new HashMap<>(receivedBuckets.size());
for (Entry<Address, Bucket<T>> entry : receivedBuckets.entrySet()) {
- if (selfAddress.equals(entry.getKey())) {
+ final Address addr = entry.getKey();
+
+ if (selfAddress.equals(addr)) {
// Remote cannot update our bucket
continue;
}
final Bucket<T> receivedBucket = entry.getValue();
if (receivedBucket == null) {
- LOG.debug("Ignoring null bucket from {}", entry.getKey());
+ LOG.debug("Ignoring null bucket from {}", addr);
continue;
}
// update only if remote version is newer
final long remoteVersion = receivedBucket.getVersion();
- final Long localVersion = versions.get(entry.getKey());
+ final Long localVersion = versions.get(addr);
if (localVersion != null && remoteVersion <= localVersion.longValue()) {
- LOG.debug("Ignoring down-versioned bucket from {} ({} local {} remote)", entry.getKey(), localVersion,
+ LOG.debug("Ignoring down-versioned bucket from {} ({} local {} remote)", addr, localVersion,
remoteVersion);
continue;
}
+ newBuckets.put(addr, receivedBucket);
+ versions.put(addr, remoteVersion);
+ final Bucket<T> prevBucket = remoteBuckets.put(addr, receivedBucket);
+
+ // Deal with DeathWatch subscriptions
+ final Optional<ActorRef> prevRef = prevBucket != null ? prevBucket.getWatchActor() : Optional.empty();
+ final Optional<ActorRef> curRef = receivedBucket.getWatchActor();
+ if (!curRef.equals(prevRef)) {
+ prevRef.ifPresent(ref -> removeWatch(addr, ref));
+ curRef.ifPresent(ref -> addWatch(addr, ref));
+ }
- newBuckets.put(entry.getKey(), receivedBucket);
- remoteBuckets.put(entry.getKey(), receivedBucket);
- versions.put(entry.getKey(), remoteVersion);
LOG.debug("Updating bucket from {} to version {}", entry.getKey(), remoteVersion);
}
onBucketsUpdated(newBuckets);
}
- private void removeBucket(final Address address) {
- final Bucket<T> bucket = remoteBuckets.remove(address);
+ private void addWatch(final Address addr, final ActorRef ref) {
+ if (!watchedActors.containsKey(ref)) {
+ getContext().watch(ref);
+ LOG.debug("Watching {}", ref);
+ }
+ watchedActors.put(ref, addr);
+ }
+
+ private void removeWatch(final Address addr, final ActorRef ref) {
+ watchedActors.remove(ref, addr);
+ if (!watchedActors.containsKey(ref)) {
+ getContext().unwatch(ref);
+ LOG.debug("No longer watching {}", ref);
+ }
+ }
+
+ private void removeBucket(final Address addr) {
+ final Bucket<T> bucket = remoteBuckets.remove(addr);
if (bucket != null) {
- onBucketRemoved(address, bucket);
+ bucket.getWatchActor().ifPresent(ref -> removeWatch(addr, ref));
+ onBucketRemoved(addr, bucket);
+ }
+ }
+
+ private void actorTerminated(final Terminated message) {
+ LOG.info("Actor termination {} received", message);
+
+ for (Address addr : watchedActors.removeAll(message.getActor())) {
+ versions.remove(addr);
+ final Bucket<T> bucket = remoteBuckets.remove(addr);
+ if (bucket != null) {
+ LOG.debug("Source actor dead, removing bucket {} from ", bucket, addr);
+ onBucketRemoved(addr, bucket);
+ }
}
}
}
}
- public static class ContainsBuckets<T extends Copier<T>> implements Serializable{
+ public static class ContainsBuckets<T extends BucketData<T>> implements Serializable {
private static final long serialVersionUID = -4940160367495308286L;
private final Map<Address, Bucket<T>> buckets;
}
}
- public static final class GetAllBucketsReply<T extends Copier<T>> extends ContainsBuckets<T> {
+ public static final class GetAllBucketsReply<T extends BucketData<T>> extends ContainsBuckets<T> {
private static final long serialVersionUID = 1L;
public GetAllBucketsReply(final Map<Address, Bucket<T>> buckets) {
}
}
- public static final class GetBucketsByMembersReply<T extends Copier<T>> extends ContainsBuckets<T> {
+ public static final class GetBucketsByMembersReply<T extends BucketData<T>> extends ContainsBuckets<T> {
private static final long serialVersionUID = 1L;
public GetBucketsByMembersReply(final Map<Address, Bucket<T>> buckets) {
}
}
- public static final class UpdateRemoteBuckets<T extends Copier<T>> extends ContainsBuckets<T> {
+ public static final class UpdateRemoteBuckets<T extends BucketData<T>> extends ContainsBuckets<T> {
private static final long serialVersionUID = 1L;
public UpdateRemoteBuckets(final Map<Address, Bucket<T>> buckets) {
}
}
- public static final class GossipEnvelope<T extends Copier<T>> extends ContainsBuckets<T> implements Serializable {
+ public static final class GossipEnvelope<T extends BucketData<T>> extends ContainsBuckets<T> {
private static final long serialVersionUID = 8346634072582438818L;
private final Address from;