+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.remote.rpc.registry.gossip;
-
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorRefProvider;
-import akka.actor.Address;
-import akka.actor.PoisonPill;
-import akka.actor.Terminated;
-import akka.cluster.ClusterActorRefProvider;
-import akka.persistence.DeleteSnapshotsFailure;
-import akka.persistence.DeleteSnapshotsSuccess;
-import akka.persistence.RecoveryCompleted;
-import akka.persistence.SaveSnapshotFailure;
-import akka.persistence.SaveSnapshotSuccess;
-import akka.persistence.SnapshotOffer;
-import akka.persistence.SnapshotSelectionCriteria;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.SetMultimap;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.Set;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
-
-/**
- * A store that syncs its data across nodes in the cluster.
- * It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned.
- * A node can write ONLY to its bucket. This way, write conflicts are avoided.
- *
- * <p>
- * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol).
- * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
- *
- */
-public class BucketStore<T extends BucketData<T>> extends AbstractUntypedPersistentActorWithMetering {
- /**
- * Buckets owned by other known nodes in the cluster.
- */
- private final Map<Address, Bucket<T>> remoteBuckets = new HashMap<>();
-
- /**
- * Bucket version for every known node in the cluster including this node.
- */
- private final Map<Address, Long> versions = new HashMap<>();
-
- /**
- * {@link ActorRef}s being watched for liveness due to being referenced in bucket data. Each actor is monitored
- * once, possibly being tied to multiple addresses (and by extension, buckets).
- */
- private final SetMultimap<ActorRef, Address> watchedActors = HashMultimap.create(1, 1);
-
- private final RemoteRpcProviderConfig config;
- private final String persistenceId;
-
- /**
- * Cluster address for this node.
- */
- private Address selfAddress;
-
- /**
- * Bucket owned by the node. Initialized during recovery (due to incarnation number).
- */
- private LocalBucket<T> localBucket;
- private T initialData;
- private Integer incarnation;
- private boolean persisting;
-
- public BucketStore(final RemoteRpcProviderConfig config, final String persistenceId, final T initialData) {
- this.config = Preconditions.checkNotNull(config);
- this.initialData = Preconditions.checkNotNull(initialData);
- this.persistenceId = Preconditions.checkNotNull(persistenceId);
- }
-
- @Override
- public String persistenceId() {
- return persistenceId;
- }
-
- @Override
- public void preStart() {
- ActorRefProvider provider = getContext().provider();
- selfAddress = provider.getDefaultAddress();
-
- if (provider instanceof ClusterActorRefProvider) {
- getContext().actorOf(Gossiper.props(config).withMailbox(config.getMailBoxName()), "gossiper");
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- protected void handleCommand(final Object message) throws Exception {
- if (message instanceof GetAllBuckets) {
- // GetAllBuckets is used only in testing
- receiveGetAllBuckets();
- return;
- }
-
- if (persisting) {
- handleSnapshotMessage(message);
- return;
- }
-
- if (message instanceof GetBucketsByMembers) {
- receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
- } else if (message instanceof GetBucketVersions) {
- receiveGetBucketVersions();
- } else if (message instanceof UpdateRemoteBuckets) {
- receiveUpdateRemoteBuckets(((UpdateRemoteBuckets<T>) message).getBuckets());
- } else if (message instanceof RemoveRemoteBucket) {
- removeBucket(((RemoveRemoteBucket) message).getAddress());
- } else if (message instanceof Terminated) {
- actorTerminated((Terminated) message);
- } else if (message instanceof DeleteSnapshotsSuccess) {
- LOG.debug("{}: got command: {}", persistenceId(), message);
- } else if (message instanceof DeleteSnapshotsFailure) {
- LOG.warn("{}: failed to delete prior snapshots", persistenceId(),
- ((DeleteSnapshotsFailure) message).cause());
- } else {
- LOG.debug("Unhandled message [{}]", message);
- unhandled(message);
- }
- }
-
- private void handleSnapshotMessage(final Object message) {
- if (message instanceof SaveSnapshotFailure) {
- LOG.error("{}: failed to persist state", persistenceId(), ((SaveSnapshotFailure) message).cause());
- persisting = false;
- self().tell(PoisonPill.getInstance(), ActorRef.noSender());
- } else if (message instanceof SaveSnapshotSuccess) {
- LOG.debug("{}: got command: {}", persistenceId(), message);
- SaveSnapshotSuccess saved = (SaveSnapshotSuccess)message;
- deleteSnapshots(new SnapshotSelectionCriteria(saved.metadata().sequenceNr(),
- saved.metadata().timestamp() - 1, 0L, 0L));
- persisting = false;
- unstash();
- } else {
- LOG.debug("{}: stashing command {}", persistenceId(), message);
- stash();
- }
- }
-
- @Override
- protected void handleRecover(final Object message) throws Exception {
- if (message instanceof RecoveryCompleted) {
- if (incarnation != null) {
- incarnation = incarnation + 1;
- } else {
- incarnation = 0;
- }
-
- this.localBucket = new LocalBucket<>(incarnation.intValue(), initialData);
- initialData = null;
- LOG.debug("{}: persisting new incarnation {}", persistenceId(), incarnation);
- persisting = true;
- saveSnapshot(incarnation);
- } else if (message instanceof SnapshotOffer) {
- incarnation = (Integer) ((SnapshotOffer)message).snapshot();
- LOG.debug("{}: recovered incarnation {}", persistenceId(), incarnation);
- } else {
- LOG.warn("{}: ignoring recovery message {}", persistenceId(), message);
- }
- }
-
- protected RemoteRpcProviderConfig getConfig() {
- return config;
- }
-
- /**
- * Returns all the buckets the this node knows about, self owned + remote.
- */
- @VisibleForTesting
- protected void receiveGetAllBuckets() {
- final ActorRef sender = getSender();
- sender.tell(new GetAllBucketsReply<>(getAllBuckets()), getSelf());
- }
-
- /**
- * Helper to collect all known buckets.
- *
- * @return self owned + remote buckets
- */
- Map<Address, Bucket<T>> getAllBuckets() {
- Map<Address, Bucket<T>> all = new HashMap<>(remoteBuckets.size() + 1);
-
- //first add the local bucket
- all.put(selfAddress, getLocalBucket().snapshot());
-
- //then get all remote buckets
- all.putAll(remoteBuckets);
-
- return all;
- }
-
- /**
- * Returns buckets for requested members that this node knows about.
- *
- * @param members requested members
- */
- void receiveGetBucketsByMembers(final Set<Address> members) {
- final ActorRef sender = getSender();
- Map<Address, Bucket<T>> buckets = getBucketsByMembers(members);
- sender.tell(new GetBucketsByMembersReply<>(buckets), getSelf());
- }
-
- /**
- * Helper to collect buckets for requested members.
- *
- * @param members requested members
- * @return buckets for requested members
- */
- Map<Address, Bucket<T>> getBucketsByMembers(final Set<Address> members) {
- Map<Address, Bucket<T>> buckets = new HashMap<>();
-
- //first add the local bucket if asked
- if (members.contains(selfAddress)) {
- buckets.put(selfAddress, getLocalBucket().snapshot());
- }
-
- //then get buckets for requested remote nodes
- for (Address address : members) {
- if (remoteBuckets.containsKey(address)) {
- buckets.put(address, remoteBuckets.get(address));
- }
- }
-
- return buckets;
- }
-
- /**
- * Returns versions for all buckets known.
- */
- void receiveGetBucketVersions() {
- final ActorRef sender = getSender();
- GetBucketVersionsReply reply = new GetBucketVersionsReply(versions);
- sender.tell(reply, getSelf());
- }
-
- /**
- * Update local copy of remote buckets where local copy's version is older.
- *
- * @param receivedBuckets buckets sent by remote
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
- */
- void receiveUpdateRemoteBuckets(final Map<Address, Bucket<T>> receivedBuckets) {
- LOG.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
- if (receivedBuckets == null || receivedBuckets.isEmpty()) {
- //nothing to do
- return;
- }
-
- final Map<Address, Bucket<T>> newBuckets = new HashMap<>(receivedBuckets.size());
- for (Entry<Address, Bucket<T>> entry : receivedBuckets.entrySet()) {
- final Address addr = entry.getKey();
-
- if (selfAddress.equals(addr)) {
- // Remote cannot update our bucket
- continue;
- }
-
- final Bucket<T> receivedBucket = entry.getValue();
- if (receivedBucket == null) {
- LOG.debug("Ignoring null bucket from {}", addr);
- continue;
- }
-
- // update only if remote version is newer
- final long remoteVersion = receivedBucket.getVersion();
- final Long localVersion = versions.get(addr);
- if (localVersion != null && remoteVersion <= localVersion.longValue()) {
- LOG.debug("Ignoring down-versioned bucket from {} ({} local {} remote)", addr, localVersion,
- remoteVersion);
- continue;
- }
- newBuckets.put(addr, receivedBucket);
- versions.put(addr, remoteVersion);
- final Bucket<T> prevBucket = remoteBuckets.put(addr, receivedBucket);
-
- // Deal with DeathWatch subscriptions
- final Optional<ActorRef> prevRef = prevBucket != null ? prevBucket.getWatchActor() : Optional.empty();
- final Optional<ActorRef> curRef = receivedBucket.getWatchActor();
- if (!curRef.equals(prevRef)) {
- prevRef.ifPresent(ref -> removeWatch(addr, ref));
- curRef.ifPresent(ref -> addWatch(addr, ref));
- }
-
- LOG.debug("Updating bucket from {} to version {}", entry.getKey(), remoteVersion);
- }
-
- LOG.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
-
- onBucketsUpdated(newBuckets);
- }
-
- private void addWatch(final Address addr, final ActorRef ref) {
- if (!watchedActors.containsKey(ref)) {
- getContext().watch(ref);
- LOG.debug("Watching {}", ref);
- }
- watchedActors.put(ref, addr);
- }
-
- private void removeWatch(final Address addr, final ActorRef ref) {
- watchedActors.remove(ref, addr);
- if (!watchedActors.containsKey(ref)) {
- getContext().unwatch(ref);
- LOG.debug("No longer watching {}", ref);
- }
- }
-
- private void removeBucket(final Address addr) {
- final Bucket<T> bucket = remoteBuckets.remove(addr);
- if (bucket != null) {
- bucket.getWatchActor().ifPresent(ref -> removeWatch(addr, ref));
- onBucketRemoved(addr, bucket);
- }
- }
-
- private void actorTerminated(final Terminated message) {
- LOG.info("Actor termination {} received", message);
-
- for (Address addr : watchedActors.removeAll(message.getActor())) {
- versions.remove(addr);
- final Bucket<T> bucket = remoteBuckets.remove(addr);
- if (bucket != null) {
- LOG.debug("Source actor dead, removing bucket {} from ", bucket, addr);
- onBucketRemoved(addr, bucket);
- }
- }
- }
-
- /**
- * Callback to subclasses invoked when a bucket is removed.
- *
- * @param address Remote address
- * @param bucket Bucket removed
- */
- protected void onBucketRemoved(final Address address, final Bucket<T> bucket) {
- // Default noop
- }
-
- /**
- * Callback to subclasses invoked when the set of remote buckets is updated.
- *
- * @param newBuckets Map of address to new bucket. Never null, but can be empty.
- */
- protected void onBucketsUpdated(final Map<Address, Bucket<T>> newBuckets) {
- // Default noop
- }
-
- @VisibleForTesting
- protected boolean isPersisting() {
- return persisting;
- }
-
- public T getLocalData() {
- return getLocalBucket().getData();
- }
-
- private LocalBucket<T> getLocalBucket() {
- Preconditions.checkState(localBucket != null, "Attempted to access local bucket before recovery completed");
- return localBucket;
- }
-
- protected void updateLocalBucket(final T data) {
- final LocalBucket<T> local = getLocalBucket();
- final boolean bumpIncarnation = local.setData(data);
- versions.put(selfAddress, local.getVersion());
-
- if (bumpIncarnation) {
- LOG.debug("Version wrapped. incrementing incarnation");
-
- Verify.verify(incarnation < Integer.MAX_VALUE, "Ran out of incarnations, cannot continue");
- incarnation = incarnation + 1;
-
- persisting = true;
- saveSnapshot(incarnation);
- }
- }
-
- public Map<Address, Bucket<T>> getRemoteBuckets() {
- return remoteBuckets;
- }
-
- public Map<Address, Long> getVersions() {
- return versions;
- }
-}