X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2Fgossip%2FBucketStore.java;h=b4af0adfe5f4160868752aab7e0210a8607c3910;hp=628deb4311cebe1da5ff1b44deb715d933b8b8b2;hb=00634259fd13ebc57f16ad63340e6472a2b6c6f2;hpb=a8cdfe15e97b0ca8f683a2d0aed1b37ab15618e0 diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java index 628deb4311..b4af0adfe5 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java @@ -11,11 +11,15 @@ package org.opendaylight.controller.remote.rpc.registry.gossip; import akka.actor.ActorRef; import akka.actor.ActorRefProvider; import akka.actor.Address; -import akka.actor.Props; +import akka.actor.Terminated; import akka.cluster.ClusterActorRefProvider; -import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.SetMultimap; import java.util.HashMap; import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; @@ -25,108 +29,118 @@ import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketSto import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets; import org.opendaylight.controller.utils.ConditionalProbe; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * A store that syncs its data across nodes in the cluster. * It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned. * A node can write ONLY to its bucket. This way, write conflicts are avoided. + * *

- * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol)

+ * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol). * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}. * */ -public class BucketStore> extends AbstractUntypedActorWithMetering { - - private static final Long NO_VERSION = -1L; - - protected final Logger log = LoggerFactory.getLogger(getClass()); - +public class BucketStore> extends AbstractUntypedActorWithMetering { /** - * Bucket owned by the node + * Bucket owned by the node. */ - private final BucketImpl localBucket = new BucketImpl<>(); + private final BucketImpl localBucket; /** - * Buckets ownded by other known nodes in the cluster + * Buckets owned by other known nodes in the cluster. */ private final Map> remoteBuckets = new HashMap<>(); /** - * Bucket version for every known node in the cluster including this node + * Bucket version for every known node in the cluster including this node. */ private final Map versions = new HashMap<>(); /** - * Cluster address for this node + * {@link ActorRef}s being watched for liveness due to being referenced in bucket data. Each actor is monitored + * once, possibly being tied to multiple addresses (and by extension, buckets). + */ + private final SetMultimap watchedActors = HashMultimap.create(1, 1); + + /** + * Cluster address for this node. */ private Address selfAddress; + // FIXME: should be part of test-specific subclass private ConditionalProbe probe; private final RemoteRpcProviderConfig config; - public BucketStore(){ - config = new RemoteRpcProviderConfig(getContext().system().settings().config()); + public BucketStore(final RemoteRpcProviderConfig config, final T initialData) { + this.config = Preconditions.checkNotNull(config); + this.localBucket = new BucketImpl<>(initialData); } @Override - public void preStart(){ + public void preStart() { ActorRefProvider provider = getContext().provider(); selfAddress = provider.getDefaultAddress(); - if ( provider instanceof ClusterActorRefProvider) { - getContext().actorOf(Props.create(Gossiper.class).withMailbox(config.getMailBoxName()), "gossiper"); + if (provider instanceof ClusterActorRefProvider) { + getContext().actorOf(Gossiper.props(config).withMailbox(config.getMailBoxName()), "gossiper"); } } + @SuppressWarnings("unchecked") @Override - protected void handleReceive(Object message) throws Exception { + protected void handleReceive(final Object message) throws Exception { if (probe != null) { probe.tell(message, getSelf()); } - if (message instanceof ConditionalProbe) { - // The ConditionalProbe is only used for unit tests. - log.info("Received probe {} {}", getSelf(), message); - probe = (ConditionalProbe) message; - // Send back any message to tell the caller we got the probe. - getSender().tell("Got it", getSelf()); - } else if (message instanceof GetAllBuckets) { - receiveGetAllBuckets(); - } else if (message instanceof GetBucketsByMembers) { + if (message instanceof GetBucketsByMembers) { receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers()); } else if (message instanceof GetBucketVersions) { receiveGetBucketVersions(); } else if (message instanceof UpdateRemoteBuckets) { - receiveUpdateRemoteBuckets(((UpdateRemoteBuckets) message).getBuckets()); + receiveUpdateRemoteBuckets(((UpdateRemoteBuckets) message).getBuckets()); + } else if (message instanceof RemoveRemoteBucket) { + removeBucket(((RemoveRemoteBucket) message).getAddress()); + } else if (message instanceof Terminated) { + actorTerminated((Terminated) message); + } else if (message instanceof GetAllBuckets) { + // GetAllBuckets is used only for unit tests. + receiveGetAllBuckets(); + } else if (message instanceof ConditionalProbe) { + // The ConditionalProbe is only used for unit tests. + LOG.info("Received probe {} {}", getSelf(), message); + probe = (ConditionalProbe) message; + // Send back any message to tell the caller we got the probe. + getSender().tell("Got it", getSelf()); } else { - if(log.isDebugEnabled()) { - log.debug("Unhandled message [{}]", message); - } + LOG.debug("Unhandled message [{}]", message); unhandled(message); } } + protected RemoteRpcProviderConfig getConfig() { + return config; + } + /** - * Returns all the buckets the this node knows about, self owned + remote + * Returns all the buckets the this node knows about, self owned + remote. */ - void receiveGetAllBuckets(){ + void receiveGetAllBuckets() { final ActorRef sender = getSender(); - sender.tell(new GetAllBucketsReply(getAllBuckets()), getSelf()); + sender.tell(new GetAllBucketsReply<>(getAllBuckets()), getSelf()); } /** - * Helper to collect all known buckets + * Helper to collect all known buckets. * * @return self owned + remote buckets */ - @SuppressWarnings("rawtypes") - Map getAllBuckets(){ - Map all = new HashMap<>(remoteBuckets.size() + 1); + Map> getAllBuckets() { + Map> all = new HashMap<>(remoteBuckets.size() + 1); //first add the local bucket all.put(selfAddress, new BucketImpl<>(localBucket)); @@ -138,26 +152,24 @@ public class BucketStore> extends AbstractUntypedActorWithMe } /** - * Returns buckets for requested members that this node knows about + * Returns buckets for requested members that this node knows about. * * @param members requested members */ - @SuppressWarnings("rawtypes") - void receiveGetBucketsByMembers(Set

members){ + void receiveGetBucketsByMembers(final Set
members) { final ActorRef sender = getSender(); - Map buckets = getBucketsByMembers(members); - sender.tell(new GetBucketsByMembersReply(buckets), getSelf()); + Map> buckets = getBucketsByMembers(members); + sender.tell(new GetBucketsByMembersReply<>(buckets), getSelf()); } /** - * Helper to collect buckets for requested memebers + * Helper to collect buckets for requested members. * * @param members requested members - * @return buckets for requested memebers + * @return buckets for requested members */ - @SuppressWarnings("rawtypes") - Map getBucketsByMembers(Set
members) { - Map buckets = new HashMap<>(); + Map> getBucketsByMembers(final Set
members) { + Map> buckets = new HashMap<>(); //first add the local bucket if asked if (members.contains(selfAddress)) { @@ -165,7 +177,7 @@ public class BucketStore> extends AbstractUntypedActorWithMe } //then get buckets for requested remote nodes - for (Address address : members){ + for (Address address : members) { if (remoteBuckets.containsKey(address)) { buckets.put(address, remoteBuckets.get(address)); } @@ -175,76 +187,140 @@ public class BucketStore> extends AbstractUntypedActorWithMe } /** - * Returns versions for all buckets known + * Returns versions for all buckets known. */ - void receiveGetBucketVersions(){ + void receiveGetBucketVersions() { final ActorRef sender = getSender(); GetBucketVersionsReply reply = new GetBucketVersionsReply(versions); sender.tell(reply, getSelf()); } /** - * Update local copy of remote buckets where local copy's version is older + * Update local copy of remote buckets where local copy's version is older. * * @param receivedBuckets buckets sent by remote * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper} */ - @SuppressWarnings({ "rawtypes", "unchecked" }) - void receiveUpdateRemoteBuckets(Map receivedBuckets){ - log.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets); - if (receivedBuckets == null || receivedBuckets.isEmpty()) - { - return; //nothing to do + void receiveUpdateRemoteBuckets(final Map> receivedBuckets) { + LOG.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets); + if (receivedBuckets == null || receivedBuckets.isEmpty()) { + //nothing to do + return; } - //Remote cant update self's bucket - receivedBuckets.remove(selfAddress); - - for (Map.Entry entry : receivedBuckets.entrySet()){ + final Map> newBuckets = new HashMap<>(receivedBuckets.size()); + for (Entry> entry : receivedBuckets.entrySet()) { + final Address addr = entry.getKey(); - Long localVersion = versions.get(entry.getKey()); - if (localVersion == null) { - localVersion = NO_VERSION; + if (selfAddress.equals(addr)) { + // Remote cannot update our bucket + continue; } - Bucket receivedBucket = entry.getValue(); - + final Bucket receivedBucket = entry.getValue(); if (receivedBucket == null) { + LOG.debug("Ignoring null bucket from {}", addr); continue; } - Long remoteVersion = receivedBucket.getVersion(); - if (remoteVersion == null) { - remoteVersion = NO_VERSION; + // update only if remote version is newer + final long remoteVersion = receivedBucket.getVersion(); + final Long localVersion = versions.get(addr); + if (localVersion != null && remoteVersion <= localVersion.longValue()) { + LOG.debug("Ignoring down-versioned bucket from {} ({} local {} remote)", addr, localVersion, + remoteVersion); + continue; } - - //update only if remote version is newer - if ( remoteVersion.longValue() > localVersion.longValue() ) { - remoteBuckets.put(entry.getKey(), receivedBucket); - versions.put(entry.getKey(), remoteVersion); + newBuckets.put(addr, receivedBucket); + versions.put(addr, remoteVersion); + final Bucket prevBucket = remoteBuckets.put(addr, receivedBucket); + + // Deal with DeathWatch subscriptions + final Optional prevRef = prevBucket != null ? prevBucket.getWatchActor() : Optional.empty(); + final Optional curRef = receivedBucket.getWatchActor(); + if (!curRef.equals(prevRef)) { + prevRef.ifPresent(ref -> removeWatch(addr, ref)); + curRef.ifPresent(ref -> addWatch(addr, ref)); } + + LOG.debug("Updating bucket from {} to version {}", entry.getKey(), remoteVersion); + } + + LOG.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets); + + onBucketsUpdated(newBuckets); + } + + private void addWatch(final Address addr, final ActorRef ref) { + if (!watchedActors.containsKey(ref)) { + getContext().watch(ref); + LOG.debug("Watching {}", ref); + } + watchedActors.put(ref, addr); + } + + private void removeWatch(final Address addr, final ActorRef ref) { + watchedActors.remove(ref, addr); + if (!watchedActors.containsKey(ref)) { + getContext().unwatch(ref); + LOG.debug("No longer watching {}", ref); + } + } + + private void removeBucket(final Address addr) { + final Bucket bucket = remoteBuckets.remove(addr); + if (bucket != null) { + bucket.getWatchActor().ifPresent(ref -> removeWatch(addr, ref)); + onBucketRemoved(addr, bucket); } + } + + private void actorTerminated(final Terminated message) { + LOG.info("Actor termination {} received", message); - if(log.isDebugEnabled()) { - log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets); + for (Address addr : watchedActors.removeAll(message.getActor())) { + versions.remove(addr); + final Bucket bucket = remoteBuckets.remove(addr); + if (bucket != null) { + LOG.debug("Source actor dead, removing bucket {} from ", bucket, addr); + onBucketRemoved(addr, bucket); + } } } - protected BucketImpl getLocalBucket() { + /** + * Callback to subclasses invoked when a bucket is removed. + * + * @param address Remote address + * @param bucket Bucket removed + */ + protected void onBucketRemoved(final Address address, final Bucket bucket) { + // Default noop + } + + /** + * Callback to subclasses invoked when the set of remote buckets is updated. + * + * @param newBuckets Map of address to new bucket. Never null, but can be empty. + */ + protected void onBucketsUpdated(final Map> newBuckets) { + // Default noop + } + + public BucketImpl getLocalBucket() { return localBucket; } - protected void updateLocalBucket(T data) { + protected void updateLocalBucket(final T data) { localBucket.setData(data); versions.put(selfAddress, localBucket.getVersion()); } - protected Map> getRemoteBuckets() { + public Map> getRemoteBuckets() { return remoteBuckets; } - @VisibleForTesting - Map getVersions() { + public Map getVersions() { return versions; } }