X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2Fgossip%2FBucketStore.java;h=70f4053723d4c426be572de0c9c69fbb3e6950c8;hb=d04b71990a802071a786fe8f0df57bc4adbdec3f;hp=23cbaca32f483f6af6ba8046e34d9233d326d6c5;hpb=d6f1e7790157461553b26ec82d246e68b62aad6b;p=controller.git diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java index 23cbaca32f..70f4053723 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java @@ -8,159 +8,203 @@ package org.opendaylight.controller.remote.rpc.registry.gossip; +import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets; + import akka.actor.ActorRef; +import akka.actor.ActorRefProvider; import akka.actor.Address; -import akka.actor.Props; -import akka.actor.UntypedActor; -import akka.cluster.Cluster; -import akka.event.Logging; -import akka.event.LoggingAdapter; -import org.opendaylight.controller.utils.ConditionalProbe; - +import akka.actor.PoisonPill; +import akka.actor.Terminated; +import akka.cluster.ClusterActorRefProvider; +import akka.persistence.DeleteSnapshotsFailure; +import akka.persistence.DeleteSnapshotsSuccess; +import akka.persistence.RecoveryCompleted; +import akka.persistence.SaveSnapshotFailure; +import akka.persistence.SaveSnapshotSuccess; +import akka.persistence.SnapshotOffer; +import akka.persistence.SnapshotSelectionCriteria; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Verify; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.SetMultimap; import java.util.HashMap; import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; +import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets; /** * A store that syncs its data across nodes in the cluster. * It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned. * A node can write ONLY to its bucket. This way, write conflicts are avoided. + * *

- * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol)

+ * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol). * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}. * */ -public class BucketStore extends UntypedActor { - - final LoggingAdapter log = Logging.getLogger(getContext().system(), this); - +public class BucketStore> extends AbstractUntypedPersistentActorWithMetering { /** - * Bucket owned by the node + * Buckets owned by other known nodes in the cluster. */ - private BucketImpl localBucket = new BucketImpl();; + private final Map> remoteBuckets = new HashMap<>(); /** - * Buckets ownded by other known nodes in the cluster + * Bucket version for every known node in the cluster including this node. */ - private ConcurrentMap remoteBuckets = new ConcurrentHashMap<>(); + private final Map versions = new HashMap<>(); /** - * Bucket version for every known node in the cluster including this node + * {@link ActorRef}s being watched for liveness due to being referenced in bucket data. Each actor is monitored + * once, possibly being tied to multiple addresses (and by extension, buckets). */ - private ConcurrentMap versions = new ConcurrentHashMap<>(); + private final SetMultimap watchedActors = HashMultimap.create(1, 1); + + private final RemoteRpcProviderConfig config; + private final String persistenceId; /** - * Cluster address for this node + * Cluster address for this node. */ - private final Address selfAddress = Cluster.get(getContext().system()).selfAddress(); + private Address selfAddress; /** - * Our private gossiper + * Bucket owned by the node. Initialized during recovery (due to incarnation number). */ - private ActorRef gossiper; - - private ConditionalProbe probe; - - public BucketStore(){ - gossiper = getContext().actorOf(Props.create(Gossiper.class), "gossiper"); + private LocalBucket localBucket; + private T initialData; + private Integer incarnation; + private boolean persisting; + + public BucketStore(final RemoteRpcProviderConfig config, final String persistenceId, final T initialData) { + this.config = Preconditions.checkNotNull(config); + this.initialData = Preconditions.checkNotNull(initialData); + this.persistenceId = Preconditions.checkNotNull(persistenceId); } - /** - * This constructor is useful for testing. - * TODO: Pass Props instead of ActorRef - * - * @param gossiper - */ - public BucketStore(ActorRef gossiper){ - this.gossiper = gossiper; + @Override + public String persistenceId() { + return persistenceId; } @Override - public void onReceive(Object message) throws Exception { + public void preStart() { + ActorRefProvider provider = getContext().provider(); + selfAddress = provider.getDefaultAddress(); - log.debug("Received message: node[{}], message[{}]", selfAddress, - message); + if (provider instanceof ClusterActorRefProvider) { + getContext().actorOf(Gossiper.props(config).withMailbox(config.getMailBoxName()), "gossiper"); + } + } - if (probe != null) { + @SuppressWarnings("unchecked") + @Override + protected void handleCommand(final Object message) throws Exception { + if (message instanceof GetAllBuckets) { + // GetAllBuckets is used only in testing + receiveGetAllBuckets(); + return; + } - probe.tell(message, getSelf()); + if (persisting) { + handleSnapshotMessage(message); + return; } - if (message instanceof ConditionalProbe) { - log.info("Received probe {} {}", getSelf(), message); - probe = (ConditionalProbe) message; - } else if (message instanceof UpdateBucket) { - receiveUpdateBucket(((UpdateBucket) message).getBucket()); - } else if (message instanceof GetAllBuckets) { - receiveGetAllBucket(); - } else if (message instanceof GetLocalBucket) { - receiveGetLocalBucket(); - } else if (message instanceof GetBucketsByMembers) { - receiveGetBucketsByMembers( - ((GetBucketsByMembers) message).getMembers()); + if (message instanceof GetBucketsByMembers) { + receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers()); } else if (message instanceof GetBucketVersions) { receiveGetBucketVersions(); } else if (message instanceof UpdateRemoteBuckets) { - receiveUpdateRemoteBuckets( - ((UpdateRemoteBuckets) message).getBuckets()); + receiveUpdateRemoteBuckets(((UpdateRemoteBuckets) message).getBuckets()); + } else if (message instanceof RemoveRemoteBucket) { + removeBucket(((RemoveRemoteBucket) message).getAddress()); + } else if (message instanceof Terminated) { + actorTerminated((Terminated) message); + } else if (message instanceof DeleteSnapshotsSuccess) { + LOG.debug("{}: got command: {}", persistenceId(), message); + } else if (message instanceof DeleteSnapshotsFailure) { + LOG.warn("{}: failed to delete prior snapshots", persistenceId(), + ((DeleteSnapshotsFailure) message).cause()); } else { - log.debug("Unhandled message [{}]", message); + LOG.debug("Unhandled message [{}]", message); unhandled(message); } - } - /** - * Returns a copy of bucket owned by this node - */ - private void receiveGetLocalBucket() { - final ActorRef sender = getSender(); - GetLocalBucketReply reply = new GetLocalBucketReply(localBucket); - sender.tell(reply, getSelf()); + private void handleSnapshotMessage(final Object message) { + if (message instanceof SaveSnapshotFailure) { + LOG.error("{}: failed to persist state", persistenceId(), ((SaveSnapshotFailure) message).cause()); + persisting = false; + self().tell(PoisonPill.getInstance(), ActorRef.noSender()); + } else if (message instanceof SaveSnapshotSuccess) { + LOG.debug("{}: got command: {}", persistenceId(), message); + SaveSnapshotSuccess saved = (SaveSnapshotSuccess)message; + deleteSnapshots(new SnapshotSelectionCriteria(saved.metadata().sequenceNr(), + saved.metadata().timestamp() - 1, 0L, 0L)); + persisting = false; + unstash(); + } else { + LOG.debug("{}: stashing command {}", persistenceId(), message); + stash(); + } } - /** - * Updates the bucket owned by this node - * - * @param updatedBucket - */ - void receiveUpdateBucket(Bucket updatedBucket){ + @Override + protected void handleRecover(final Object message) throws Exception { + if (message instanceof RecoveryCompleted) { + if (incarnation != null) { + incarnation = incarnation + 1; + } else { + incarnation = 0; + } - localBucket = (BucketImpl) updatedBucket; - versions.put(selfAddress, localBucket.getVersion()); + this.localBucket = new LocalBucket<>(incarnation.intValue(), initialData); + initialData = null; + LOG.debug("{}: persisting new incarnation {}", persistenceId(), incarnation); + persisting = true; + saveSnapshot(incarnation); + } else if (message instanceof SnapshotOffer) { + incarnation = (Integer) ((SnapshotOffer)message).snapshot(); + LOG.debug("{}: recovered incarnation {}", persistenceId(), incarnation); + } else { + LOG.warn("{}: ignoring recovery message {}", persistenceId(), message); + } + } + + protected RemoteRpcProviderConfig getConfig() { + return config; } /** - * Returns all the buckets the this node knows about, self owned + remote + * Returns all the buckets the this node knows about, self owned + remote. */ - void receiveGetAllBucket(){ + @VisibleForTesting + protected void receiveGetAllBuckets() { final ActorRef sender = getSender(); - sender.tell(new GetAllBucketsReply(getAllBuckets()), getSelf()); + sender.tell(new GetAllBucketsReply<>(getAllBuckets()), getSelf()); } /** - * Helper to collect all known buckets + * Helper to collect all known buckets. * * @return self owned + remote buckets */ - Map getAllBuckets(){ - Map all = new HashMap<>(remoteBuckets.size() + 1); + Map> getAllBuckets() { + Map> all = new HashMap<>(remoteBuckets.size() + 1); //first add the local bucket - all.put(selfAddress, localBucket); + all.put(selfAddress, getLocalBucket().snapshot()); //then get all remote buckets all.putAll(remoteBuckets); @@ -169,114 +213,196 @@ public class BucketStore extends UntypedActor { } /** - * Returns buckets for requested members that this node knows about + * Returns buckets for requested members that this node knows about. * * @param members requested members */ - void receiveGetBucketsByMembers(Set

members){ + void receiveGetBucketsByMembers(final Set
members) { final ActorRef sender = getSender(); - Map buckets = getBucketsByMembers(members); - sender.tell(new GetBucketsByMembersReply(buckets), getSelf()); + Map> buckets = getBucketsByMembers(members); + sender.tell(new GetBucketsByMembersReply<>(buckets), getSelf()); } /** - * Helper to collect buckets for requested memebers + * Helper to collect buckets for requested members. * * @param members requested members - * @return buckets for requested memebers + * @return buckets for requested members */ - Map getBucketsByMembers(Set
members) { - Map buckets = new HashMap<>(); + Map> getBucketsByMembers(final Set
members) { + Map> buckets = new HashMap<>(); //first add the local bucket if asked - if (members.contains(selfAddress)) - buckets.put(selfAddress, localBucket); + if (members.contains(selfAddress)) { + buckets.put(selfAddress, getLocalBucket().snapshot()); + } //then get buckets for requested remote nodes - for (Address address : members){ - if (remoteBuckets.containsKey(address)) + for (Address address : members) { + if (remoteBuckets.containsKey(address)) { buckets.put(address, remoteBuckets.get(address)); + } } return buckets; } /** - * Returns versions for all buckets known + * Returns versions for all buckets known. */ - void receiveGetBucketVersions(){ + void receiveGetBucketVersions() { final ActorRef sender = getSender(); GetBucketVersionsReply reply = new GetBucketVersionsReply(versions); sender.tell(reply, getSelf()); } /** - * Update local copy of remote buckets where local copy's version is older + * Update local copy of remote buckets where local copy's version is older. * * @param receivedBuckets buckets sent by remote * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper} */ - void receiveUpdateRemoteBuckets(Map receivedBuckets){ + void receiveUpdateRemoteBuckets(final Map> receivedBuckets) { + LOG.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets); + if (receivedBuckets == null || receivedBuckets.isEmpty()) { + //nothing to do + return; + } - if (receivedBuckets == null || receivedBuckets.isEmpty()) - return; //nothing to do + final Map> newBuckets = new HashMap<>(receivedBuckets.size()); + for (Entry> entry : receivedBuckets.entrySet()) { + final Address addr = entry.getKey(); - //Remote cant update self's bucket - receivedBuckets.remove(selfAddress); + if (selfAddress.equals(addr)) { + // Remote cannot update our bucket + continue; + } - for (Map.Entry entry : receivedBuckets.entrySet()){ + final Bucket receivedBucket = entry.getValue(); + if (receivedBucket == null) { + LOG.debug("Ignoring null bucket from {}", addr); + continue; + } - Long localVersion = versions.get(entry.getKey()); - if (localVersion == null) localVersion = -1L; + // update only if remote version is newer + final long remoteVersion = receivedBucket.getVersion(); + final Long localVersion = versions.get(addr); + if (localVersion != null && remoteVersion <= localVersion.longValue()) { + LOG.debug("Ignoring down-versioned bucket from {} ({} local {} remote)", addr, localVersion, + remoteVersion); + continue; + } + newBuckets.put(addr, receivedBucket); + versions.put(addr, remoteVersion); + final Bucket prevBucket = remoteBuckets.put(addr, receivedBucket); + + // Deal with DeathWatch subscriptions + final Optional prevRef = prevBucket != null ? prevBucket.getWatchActor() : Optional.empty(); + final Optional curRef = receivedBucket.getWatchActor(); + if (!curRef.equals(prevRef)) { + prevRef.ifPresent(ref -> removeWatch(addr, ref)); + curRef.ifPresent(ref -> addWatch(addr, ref)); + } - Bucket receivedBucket = entry.getValue(); + LOG.debug("Updating bucket from {} to version {}", entry.getKey(), remoteVersion); + } - if (receivedBucket == null) - continue; + LOG.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets); - Long remoteVersion = receivedBucket.getVersion(); - if (remoteVersion == null) remoteVersion = -1L; + onBucketsUpdated(newBuckets); + } - //update only if remote version is newer - if ( remoteVersion > localVersion ) { - remoteBuckets.put(entry.getKey(), receivedBucket); - versions.put(entry.getKey(), remoteVersion); - } + private void addWatch(final Address addr, final ActorRef ref) { + if (!watchedActors.containsKey(ref)) { + getContext().watch(ref); + LOG.debug("Watching {}", ref); } + watchedActors.put(ref, addr); + } - log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets); + private void removeWatch(final Address addr, final ActorRef ref) { + watchedActors.remove(ref, addr); + if (!watchedActors.containsKey(ref)) { + getContext().unwatch(ref); + LOG.debug("No longer watching {}", ref); + } } - /// - ///Getter Setters - /// + private void removeBucket(final Address addr) { + final Bucket bucket = remoteBuckets.remove(addr); + if (bucket != null) { + bucket.getWatchActor().ifPresent(ref -> removeWatch(addr, ref)); + onBucketRemoved(addr, bucket); + } + } - BucketImpl getLocalBucket() { - return localBucket; + private void actorTerminated(final Terminated message) { + LOG.info("Actor termination {} received", message); + + for (Address addr : watchedActors.removeAll(message.getActor())) { + versions.remove(addr); + final Bucket bucket = remoteBuckets.remove(addr); + if (bucket != null) { + LOG.debug("Source actor dead, removing bucket {} from ", bucket, addr); + onBucketRemoved(addr, bucket); + } + } } - void setLocalBucket(BucketImpl localBucket) { - this.localBucket = localBucket; + /** + * Callback to subclasses invoked when a bucket is removed. + * + * @param address Remote address + * @param bucket Bucket removed + */ + protected void onBucketRemoved(final Address address, final Bucket bucket) { + // Default noop } - ConcurrentMap getRemoteBuckets() { - return remoteBuckets; + /** + * Callback to subclasses invoked when the set of remote buckets is updated. + * + * @param newBuckets Map of address to new bucket. Never null, but can be empty. + */ + protected void onBucketsUpdated(final Map> newBuckets) { + // Default noop } - void setRemoteBuckets(ConcurrentMap remoteBuckets) { - this.remoteBuckets = remoteBuckets; + @VisibleForTesting + protected boolean isPersisting() { + return persisting; } - ConcurrentMap getVersions() { - return versions; + public T getLocalData() { + return getLocalBucket().getData(); + } + + private LocalBucket getLocalBucket() { + Preconditions.checkState(localBucket != null, "Attempted to access local bucket before recovery completed"); + return localBucket; } - void setVersions(ConcurrentMap versions) { - this.versions = versions; + protected void updateLocalBucket(final T data) { + final LocalBucket local = getLocalBucket(); + final boolean bumpIncarnation = local.setData(data); + versions.put(selfAddress, local.getVersion()); + + if (bumpIncarnation) { + LOG.debug("Version wrapped. incrementing incarnation"); + + Verify.verify(incarnation < Integer.MAX_VALUE, "Ran out of incarnations, cannot continue"); + incarnation = incarnation + 1; + + persisting = true; + saveSnapshot(incarnation); + } } - Address getSelfAddress() { - return selfAddress; + public Map> getRemoteBuckets() { + return remoteBuckets; } + public Map getVersions() { + return versions; + } }