X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2Fgossip%2FBucketStore.java;fp=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2Fgossip%2FBucketStore.java;h=0000000000000000000000000000000000000000;hb=5b66dd8f5e3467a07e77b20fe696b29993ce5565;hp=70f4053723d4c426be572de0c9c69fbb3e6950c8;hpb=5fd4213b5bfaf2db21f1b37139f6b98535a872c0;p=controller.git diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java deleted file mode 100644 index 70f4053723..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java +++ /dev/null @@ -1,408 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.remote.rpc.registry.gossip; - -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets; - -import akka.actor.ActorRef; -import akka.actor.ActorRefProvider; -import akka.actor.Address; -import akka.actor.PoisonPill; -import akka.actor.Terminated; -import akka.cluster.ClusterActorRefProvider; -import akka.persistence.DeleteSnapshotsFailure; -import akka.persistence.DeleteSnapshotsSuccess; -import akka.persistence.RecoveryCompleted; -import akka.persistence.SaveSnapshotFailure; -import akka.persistence.SaveSnapshotSuccess; -import akka.persistence.SnapshotOffer; -import akka.persistence.SnapshotSelectionCriteria; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.base.Verify; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.SetMultimap; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Optional; -import java.util.Set; -import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; -import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets; - -/** - * A store that syncs its data across nodes in the cluster. - * It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned. - * A node can write ONLY to its bucket. This way, write conflicts are avoided. - * - *

- * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol). - * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}. - * - */ -public class BucketStore> extends AbstractUntypedPersistentActorWithMetering { - /** - * Buckets owned by other known nodes in the cluster. - */ - private final Map> remoteBuckets = new HashMap<>(); - - /** - * Bucket version for every known node in the cluster including this node. - */ - private final Map versions = new HashMap<>(); - - /** - * {@link ActorRef}s being watched for liveness due to being referenced in bucket data. Each actor is monitored - * once, possibly being tied to multiple addresses (and by extension, buckets). - */ - private final SetMultimap watchedActors = HashMultimap.create(1, 1); - - private final RemoteRpcProviderConfig config; - private final String persistenceId; - - /** - * Cluster address for this node. - */ - private Address selfAddress; - - /** - * Bucket owned by the node. Initialized during recovery (due to incarnation number). - */ - private LocalBucket localBucket; - private T initialData; - private Integer incarnation; - private boolean persisting; - - public BucketStore(final RemoteRpcProviderConfig config, final String persistenceId, final T initialData) { - this.config = Preconditions.checkNotNull(config); - this.initialData = Preconditions.checkNotNull(initialData); - this.persistenceId = Preconditions.checkNotNull(persistenceId); - } - - @Override - public String persistenceId() { - return persistenceId; - } - - @Override - public void preStart() { - ActorRefProvider provider = getContext().provider(); - selfAddress = provider.getDefaultAddress(); - - if (provider instanceof ClusterActorRefProvider) { - getContext().actorOf(Gossiper.props(config).withMailbox(config.getMailBoxName()), "gossiper"); - } - } - - @SuppressWarnings("unchecked") - @Override - protected void handleCommand(final Object message) throws Exception { - if (message instanceof GetAllBuckets) { - // GetAllBuckets is used only in testing - receiveGetAllBuckets(); - return; - } - - if (persisting) { - handleSnapshotMessage(message); - return; - } - - if (message instanceof GetBucketsByMembers) { - receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers()); - } else if (message instanceof GetBucketVersions) { - receiveGetBucketVersions(); - } else if (message instanceof UpdateRemoteBuckets) { - receiveUpdateRemoteBuckets(((UpdateRemoteBuckets) message).getBuckets()); - } else if (message instanceof RemoveRemoteBucket) { - removeBucket(((RemoveRemoteBucket) message).getAddress()); - } else if (message instanceof Terminated) { - actorTerminated((Terminated) message); - } else if (message instanceof DeleteSnapshotsSuccess) { - LOG.debug("{}: got command: {}", persistenceId(), message); - } else if (message instanceof DeleteSnapshotsFailure) { - LOG.warn("{}: failed to delete prior snapshots", persistenceId(), - ((DeleteSnapshotsFailure) message).cause()); - } else { - LOG.debug("Unhandled message [{}]", message); - unhandled(message); - } - } - - private void handleSnapshotMessage(final Object message) { - if (message instanceof SaveSnapshotFailure) { - LOG.error("{}: failed to persist state", persistenceId(), ((SaveSnapshotFailure) message).cause()); - persisting = false; - self().tell(PoisonPill.getInstance(), ActorRef.noSender()); - } else if (message instanceof SaveSnapshotSuccess) { - LOG.debug("{}: got command: {}", persistenceId(), message); - SaveSnapshotSuccess saved = (SaveSnapshotSuccess)message; - deleteSnapshots(new SnapshotSelectionCriteria(saved.metadata().sequenceNr(), - saved.metadata().timestamp() - 1, 0L, 0L)); - persisting = false; - unstash(); - } else { - LOG.debug("{}: stashing command {}", persistenceId(), message); - stash(); - } - } - - @Override - protected void handleRecover(final Object message) throws Exception { - if (message instanceof RecoveryCompleted) { - if (incarnation != null) { - incarnation = incarnation + 1; - } else { - incarnation = 0; - } - - this.localBucket = new LocalBucket<>(incarnation.intValue(), initialData); - initialData = null; - LOG.debug("{}: persisting new incarnation {}", persistenceId(), incarnation); - persisting = true; - saveSnapshot(incarnation); - } else if (message instanceof SnapshotOffer) { - incarnation = (Integer) ((SnapshotOffer)message).snapshot(); - LOG.debug("{}: recovered incarnation {}", persistenceId(), incarnation); - } else { - LOG.warn("{}: ignoring recovery message {}", persistenceId(), message); - } - } - - protected RemoteRpcProviderConfig getConfig() { - return config; - } - - /** - * Returns all the buckets the this node knows about, self owned + remote. - */ - @VisibleForTesting - protected void receiveGetAllBuckets() { - final ActorRef sender = getSender(); - sender.tell(new GetAllBucketsReply<>(getAllBuckets()), getSelf()); - } - - /** - * Helper to collect all known buckets. - * - * @return self owned + remote buckets - */ - Map> getAllBuckets() { - Map> all = new HashMap<>(remoteBuckets.size() + 1); - - //first add the local bucket - all.put(selfAddress, getLocalBucket().snapshot()); - - //then get all remote buckets - all.putAll(remoteBuckets); - - return all; - } - - /** - * Returns buckets for requested members that this node knows about. - * - * @param members requested members - */ - void receiveGetBucketsByMembers(final Set

members) { - final ActorRef sender = getSender(); - Map> buckets = getBucketsByMembers(members); - sender.tell(new GetBucketsByMembersReply<>(buckets), getSelf()); - } - - /** - * Helper to collect buckets for requested members. - * - * @param members requested members - * @return buckets for requested members - */ - Map> getBucketsByMembers(final Set
members) { - Map> buckets = new HashMap<>(); - - //first add the local bucket if asked - if (members.contains(selfAddress)) { - buckets.put(selfAddress, getLocalBucket().snapshot()); - } - - //then get buckets for requested remote nodes - for (Address address : members) { - if (remoteBuckets.containsKey(address)) { - buckets.put(address, remoteBuckets.get(address)); - } - } - - return buckets; - } - - /** - * Returns versions for all buckets known. - */ - void receiveGetBucketVersions() { - final ActorRef sender = getSender(); - GetBucketVersionsReply reply = new GetBucketVersionsReply(versions); - sender.tell(reply, getSelf()); - } - - /** - * Update local copy of remote buckets where local copy's version is older. - * - * @param receivedBuckets buckets sent by remote - * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper} - */ - void receiveUpdateRemoteBuckets(final Map> receivedBuckets) { - LOG.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets); - if (receivedBuckets == null || receivedBuckets.isEmpty()) { - //nothing to do - return; - } - - final Map> newBuckets = new HashMap<>(receivedBuckets.size()); - for (Entry> entry : receivedBuckets.entrySet()) { - final Address addr = entry.getKey(); - - if (selfAddress.equals(addr)) { - // Remote cannot update our bucket - continue; - } - - final Bucket receivedBucket = entry.getValue(); - if (receivedBucket == null) { - LOG.debug("Ignoring null bucket from {}", addr); - continue; - } - - // update only if remote version is newer - final long remoteVersion = receivedBucket.getVersion(); - final Long localVersion = versions.get(addr); - if (localVersion != null && remoteVersion <= localVersion.longValue()) { - LOG.debug("Ignoring down-versioned bucket from {} ({} local {} remote)", addr, localVersion, - remoteVersion); - continue; - } - newBuckets.put(addr, receivedBucket); - versions.put(addr, remoteVersion); - final Bucket prevBucket = remoteBuckets.put(addr, receivedBucket); - - // Deal with DeathWatch subscriptions - final Optional prevRef = prevBucket != null ? prevBucket.getWatchActor() : Optional.empty(); - final Optional curRef = receivedBucket.getWatchActor(); - if (!curRef.equals(prevRef)) { - prevRef.ifPresent(ref -> removeWatch(addr, ref)); - curRef.ifPresent(ref -> addWatch(addr, ref)); - } - - LOG.debug("Updating bucket from {} to version {}", entry.getKey(), remoteVersion); - } - - LOG.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets); - - onBucketsUpdated(newBuckets); - } - - private void addWatch(final Address addr, final ActorRef ref) { - if (!watchedActors.containsKey(ref)) { - getContext().watch(ref); - LOG.debug("Watching {}", ref); - } - watchedActors.put(ref, addr); - } - - private void removeWatch(final Address addr, final ActorRef ref) { - watchedActors.remove(ref, addr); - if (!watchedActors.containsKey(ref)) { - getContext().unwatch(ref); - LOG.debug("No longer watching {}", ref); - } - } - - private void removeBucket(final Address addr) { - final Bucket bucket = remoteBuckets.remove(addr); - if (bucket != null) { - bucket.getWatchActor().ifPresent(ref -> removeWatch(addr, ref)); - onBucketRemoved(addr, bucket); - } - } - - private void actorTerminated(final Terminated message) { - LOG.info("Actor termination {} received", message); - - for (Address addr : watchedActors.removeAll(message.getActor())) { - versions.remove(addr); - final Bucket bucket = remoteBuckets.remove(addr); - if (bucket != null) { - LOG.debug("Source actor dead, removing bucket {} from ", bucket, addr); - onBucketRemoved(addr, bucket); - } - } - } - - /** - * Callback to subclasses invoked when a bucket is removed. - * - * @param address Remote address - * @param bucket Bucket removed - */ - protected void onBucketRemoved(final Address address, final Bucket bucket) { - // Default noop - } - - /** - * Callback to subclasses invoked when the set of remote buckets is updated. - * - * @param newBuckets Map of address to new bucket. Never null, but can be empty. - */ - protected void onBucketsUpdated(final Map> newBuckets) { - // Default noop - } - - @VisibleForTesting - protected boolean isPersisting() { - return persisting; - } - - public T getLocalData() { - return getLocalBucket().getData(); - } - - private LocalBucket getLocalBucket() { - Preconditions.checkState(localBucket != null, "Attempted to access local bucket before recovery completed"); - return localBucket; - } - - protected void updateLocalBucket(final T data) { - final LocalBucket local = getLocalBucket(); - final boolean bumpIncarnation = local.setData(data); - versions.put(selfAddress, local.getVersion()); - - if (bumpIncarnation) { - LOG.debug("Version wrapped. incrementing incarnation"); - - Verify.verify(incarnation < Integer.MAX_VALUE, "Ran out of incarnations, cannot continue"); - incarnation = incarnation + 1; - - persisting = true; - saveSnapshot(incarnation); - } - } - - public Map> getRemoteBuckets() { - return remoteBuckets; - } - - public Map getVersions() { - return versions; - } -}