X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2Fgossip%2FBucketStore.java;h=b4af0adfe5f4160868752aab7e0210a8607c3910;hp=81e6a9ccc3c253e0e409e37e75f8d7ad1a4af888;hb=00634259fd13ebc57f16ad63340e6472a2b6c6f2;hpb=9ddc65e1ddae50f691566cd9382707679436c055 diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java index 81e6a9ccc3..b4af0adfe5 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java @@ -11,11 +11,15 @@ package org.opendaylight.controller.remote.rpc.registry.gossip; import akka.actor.ActorRef; import akka.actor.ActorRefProvider; import akka.actor.Address; -import akka.actor.Props; +import akka.actor.Terminated; import akka.cluster.ClusterActorRefProvider; import com.google.common.base.Preconditions; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.SetMultimap; import java.util.HashMap; import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; @@ -25,10 +29,9 @@ import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketSto import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets; import org.opendaylight.controller.utils.ConditionalProbe; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * A store that syncs its data across nodes in the cluster. @@ -40,19 +43,14 @@ import org.slf4j.LoggerFactory; * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}. * */ -public class BucketStore> extends AbstractUntypedActorWithMetering { - - private static final Long NO_VERSION = -1L; - - protected final Logger log = LoggerFactory.getLogger(getClass()); - +public class BucketStore> extends AbstractUntypedActorWithMetering { /** * Bucket owned by the node. */ - private final BucketImpl localBucket = new BucketImpl<>(); + private final BucketImpl localBucket; /** - * Buckets ownded by other known nodes in the cluster. + * Buckets owned by other known nodes in the cluster. */ private final Map> remoteBuckets = new HashMap<>(); @@ -61,17 +59,25 @@ public class BucketStore> extends AbstractUntypedActorWithMe */ private final Map versions = new HashMap<>(); + /** + * {@link ActorRef}s being watched for liveness due to being referenced in bucket data. Each actor is monitored + * once, possibly being tied to multiple addresses (and by extension, buckets). + */ + private final SetMultimap watchedActors = HashMultimap.create(1, 1); + /** * Cluster address for this node. */ private Address selfAddress; + // FIXME: should be part of test-specific subclass private ConditionalProbe probe; private final RemoteRpcProviderConfig config; - public BucketStore(RemoteRpcProviderConfig config) { + public BucketStore(final RemoteRpcProviderConfig config, final T initialData) { this.config = Preconditions.checkNotNull(config); + this.localBucket = new BucketImpl<>(initialData); } @Override @@ -79,34 +85,39 @@ public class BucketStore> extends AbstractUntypedActorWithMe ActorRefProvider provider = getContext().provider(); selfAddress = provider.getDefaultAddress(); - if ( provider instanceof ClusterActorRefProvider) { - getContext().actorOf(Props.create(Gossiper.class, config).withMailbox(config.getMailBoxName()), "gossiper"); + if (provider instanceof ClusterActorRefProvider) { + getContext().actorOf(Gossiper.props(config).withMailbox(config.getMailBoxName()), "gossiper"); } } @SuppressWarnings("unchecked") @Override - protected void handleReceive(Object message) throws Exception { + protected void handleReceive(final Object message) throws Exception { if (probe != null) { probe.tell(message, getSelf()); } - if (message instanceof ConditionalProbe) { - // The ConditionalProbe is only used for unit tests. - log.info("Received probe {} {}", getSelf(), message); - probe = (ConditionalProbe) message; - // Send back any message to tell the caller we got the probe. - getSender().tell("Got it", getSelf()); - } else if (message instanceof GetAllBuckets) { - receiveGetAllBuckets(); - } else if (message instanceof GetBucketsByMembers) { + if (message instanceof GetBucketsByMembers) { receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers()); } else if (message instanceof GetBucketVersions) { receiveGetBucketVersions(); } else if (message instanceof UpdateRemoteBuckets) { receiveUpdateRemoteBuckets(((UpdateRemoteBuckets) message).getBuckets()); + } else if (message instanceof RemoveRemoteBucket) { + removeBucket(((RemoveRemoteBucket) message).getAddress()); + } else if (message instanceof Terminated) { + actorTerminated((Terminated) message); + } else if (message instanceof GetAllBuckets) { + // GetAllBuckets is used only for unit tests. + receiveGetAllBuckets(); + } else if (message instanceof ConditionalProbe) { + // The ConditionalProbe is only used for unit tests. + LOG.info("Received probe {} {}", getSelf(), message); + probe = (ConditionalProbe) message; + // Send back any message to tell the caller we got the probe. + getSender().tell("Got it", getSelf()); } else { - log.debug("Unhandled message [{}]", message); + LOG.debug("Unhandled message [{}]", message); unhandled(message); } } @@ -145,7 +156,7 @@ public class BucketStore> extends AbstractUntypedActorWithMe * * @param members requested members */ - void receiveGetBucketsByMembers(Set
members) { + void receiveGetBucketsByMembers(final Set
members) { final ActorRef sender = getSender(); Map> buckets = getBucketsByMembers(members); sender.tell(new GetBucketsByMembersReply<>(buckets), getSelf()); @@ -157,7 +168,7 @@ public class BucketStore> extends AbstractUntypedActorWithMe * @param members requested members * @return buckets for requested members */ - Map> getBucketsByMembers(Set
members) { + Map> getBucketsByMembers(final Set
members) { Map> buckets = new HashMap<>(); //first add the local bucket if asked @@ -190,53 +201,117 @@ public class BucketStore> extends AbstractUntypedActorWithMe * @param receivedBuckets buckets sent by remote * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper} */ - void receiveUpdateRemoteBuckets(Map> receivedBuckets) { - log.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets); + void receiveUpdateRemoteBuckets(final Map> receivedBuckets) { + LOG.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets); if (receivedBuckets == null || receivedBuckets.isEmpty()) { - return; //nothing to do + //nothing to do + return; } - //Remote cant update self's bucket - receivedBuckets.remove(selfAddress); + final Map> newBuckets = new HashMap<>(receivedBuckets.size()); + for (Entry> entry : receivedBuckets.entrySet()) { + final Address addr = entry.getKey(); - for (Map.Entry> entry : receivedBuckets.entrySet()) { - - Long localVersion = versions.get(entry.getKey()); - if (localVersion == null) { - localVersion = NO_VERSION; + if (selfAddress.equals(addr)) { + // Remote cannot update our bucket + continue; } - Bucket receivedBucket = entry.getValue(); - + final Bucket receivedBucket = entry.getValue(); if (receivedBucket == null) { + LOG.debug("Ignoring null bucket from {}", addr); continue; } - Long remoteVersion = receivedBucket.getVersion(); - if (remoteVersion == null) { - remoteVersion = NO_VERSION; + // update only if remote version is newer + final long remoteVersion = receivedBucket.getVersion(); + final Long localVersion = versions.get(addr); + if (localVersion != null && remoteVersion <= localVersion.longValue()) { + LOG.debug("Ignoring down-versioned bucket from {} ({} local {} remote)", addr, localVersion, + remoteVersion); + continue; } - - //update only if remote version is newer - if ( remoteVersion.longValue() > localVersion.longValue() ) { - remoteBuckets.put(entry.getKey(), receivedBucket); - versions.put(entry.getKey(), remoteVersion); + newBuckets.put(addr, receivedBucket); + versions.put(addr, remoteVersion); + final Bucket prevBucket = remoteBuckets.put(addr, receivedBucket); + + // Deal with DeathWatch subscriptions + final Optional prevRef = prevBucket != null ? prevBucket.getWatchActor() : Optional.empty(); + final Optional curRef = receivedBucket.getWatchActor(); + if (!curRef.equals(prevRef)) { + prevRef.ifPresent(ref -> removeWatch(addr, ref)); + curRef.ifPresent(ref -> addWatch(addr, ref)); } + + LOG.debug("Updating bucket from {} to version {}", entry.getKey(), remoteVersion); + } + + LOG.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets); + + onBucketsUpdated(newBuckets); + } + + private void addWatch(final Address addr, final ActorRef ref) { + if (!watchedActors.containsKey(ref)) { + getContext().watch(ref); + LOG.debug("Watching {}", ref); } + watchedActors.put(ref, addr); + } - log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets); + private void removeWatch(final Address addr, final ActorRef ref) { + watchedActors.remove(ref, addr); + if (!watchedActors.containsKey(ref)) { + getContext().unwatch(ref); + LOG.debug("No longer watching {}", ref); + } + } - onBucketsUpdated(); + private void removeBucket(final Address addr) { + final Bucket bucket = remoteBuckets.remove(addr); + if (bucket != null) { + bucket.getWatchActor().ifPresent(ref -> removeWatch(addr, ref)); + onBucketRemoved(addr, bucket); + } } - protected void onBucketsUpdated() { + private void actorTerminated(final Terminated message) { + LOG.info("Actor termination {} received", message); + + for (Address addr : watchedActors.removeAll(message.getActor())) { + versions.remove(addr); + final Bucket bucket = remoteBuckets.remove(addr); + if (bucket != null) { + LOG.debug("Source actor dead, removing bucket {} from ", bucket, addr); + onBucketRemoved(addr, bucket); + } + } + } + + /** + * Callback to subclasses invoked when a bucket is removed. + * + * @param address Remote address + * @param bucket Bucket removed + */ + protected void onBucketRemoved(final Address address, final Bucket bucket) { + // Default noop + } + + /** + * Callback to subclasses invoked when the set of remote buckets is updated. + * + * @param newBuckets Map of address to new bucket. Never null, but can be empty. + */ + protected void onBucketsUpdated(final Map> newBuckets) { + // Default noop } public BucketImpl getLocalBucket() { return localBucket; } - protected void updateLocalBucket(T data) { + protected void updateLocalBucket(final T data) { localBucket.setData(data); versions.put(selfAddress, localBucket.getVersion()); }