/*
* Copyright (c) 2014, 2017 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
package org.opendaylight.controller.remote.rpc.registry.gossip;
import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_ALL_BUCKETS;
import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_BUCKET_VERSIONS;
import akka.actor.ActorRef;
import akka.actor.ActorRefProvider;
import akka.actor.Address;
import akka.actor.PoisonPill;
import akka.actor.Terminated;
import akka.cluster.ClusterActorRefProvider;
import akka.persistence.DeleteSnapshotsFailure;
import akka.persistence.DeleteSnapshotsSuccess;
import akka.persistence.RecoveryCompleted;
import akka.persistence.SaveSnapshotFailure;
import akka.persistence.SaveSnapshotSuccess;
import akka.persistence.SnapshotOffer;
import akka.persistence.SnapshotSelectionCriteria;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.SetMultimap;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.function.Consumer;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
/**
* A store that syncs its data across nodes in the cluster.
* It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned.
* A node can write ONLY to its bucket. This way, write conflicts are avoided.
*
*
* Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol).
* This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
*/
public abstract class BucketStoreActor> extends
AbstractUntypedPersistentActorWithMetering {
// Internal marker interface for messages which are just bridges to execute a method
@FunctionalInterface
private interface ExecuteInActor extends Consumer> {
}
/**
* Buckets owned by other known nodes in the cluster.
*/
private final Map> remoteBuckets = new HashMap<>();
/**
* Bucket version for every known node in the cluster including this node.
*/
private final Map versions = new HashMap<>();
/**
* {@link ActorRef}s being watched for liveness due to being referenced in bucket data. Each actor is monitored
* once, possibly being tied to multiple addresses (and by extension, buckets).
*/
private final SetMultimap watchedActors = HashMultimap.create(1, 1);
private final RemoteRpcProviderConfig config;
private final String persistenceId;
/**
* Cluster address for this node.
*/
private Address selfAddress;
/**
* Bucket owned by the node. Initialized during recovery (due to incarnation number).
*/
private LocalBucket localBucket;
private T initialData;
private Integer incarnation;
private boolean persisting;
protected BucketStoreActor(final RemoteRpcProviderConfig config, final String persistenceId, final T initialData) {
this.config = Preconditions.checkNotNull(config);
this.initialData = Preconditions.checkNotNull(initialData);
this.persistenceId = Preconditions.checkNotNull(persistenceId);
}
static ExecuteInActor getBucketsByMembersMessage(final Collection members) {
return actor -> actor.getBucketsByMembers(members);
}
static ExecuteInActor removeBucketMessage(final Address addr) {
return actor -> actor.removeBucket(addr);
}
static ExecuteInActor updateRemoteBucketsMessage(final Map> buckets) {
return actor -> actor.updateRemoteBuckets(buckets);
}
static ExecuteInActor getLocalDataMessage() {
return actor -> actor.getSender().tell(actor.getLocalData(), actor.getSelf());
}
static ExecuteInActor getRemoteBucketsMessage() {
return actor -> actor.getSender().tell(ImmutableMap.copyOf(actor.getRemoteBuckets()), actor.getSelf());
}
public final T getLocalData() {
return getLocalBucket().getData();
}
public final Map> getRemoteBuckets() {
return remoteBuckets;
}
public final Map getVersions() {
return versions;
}
@Override
public final String persistenceId() {
return persistenceId;
}
@Override
public void preStart() {
ActorRefProvider provider = getContext().provider();
selfAddress = provider.getDefaultAddress();
if (provider instanceof ClusterActorRefProvider) {
getContext().actorOf(Gossiper.props(config).withMailbox(config.getMailBoxName()), "gossiper");
}
}
@Override
protected void handleCommand(final Object message) throws Exception {
if (GET_ALL_BUCKETS == message) {
// GetAllBuckets is used only in testing
getSender().tell(getAllBuckets(), self());
return;
}
if (persisting) {
handleSnapshotMessage(message);
return;
}
if (message instanceof ExecuteInActor) {
((ExecuteInActor) message).accept(this);
} else if (GET_BUCKET_VERSIONS == message) {
// FIXME: do we need to send ourselves?
getSender().tell(ImmutableMap.copyOf(versions), getSelf());
} else if (message instanceof Terminated) {
actorTerminated((Terminated) message);
} else if (message instanceof DeleteSnapshotsSuccess) {
LOG.debug("{}: got command: {}", persistenceId(), message);
} else if (message instanceof DeleteSnapshotsFailure) {
LOG.warn("{}: failed to delete prior snapshots", persistenceId(),
((DeleteSnapshotsFailure) message).cause());
} else {
LOG.debug("Unhandled message [{}]", message);
unhandled(message);
}
}
private void handleSnapshotMessage(final Object message) {
if (message instanceof SaveSnapshotFailure) {
LOG.error("{}: failed to persist state", persistenceId(), ((SaveSnapshotFailure) message).cause());
persisting = false;
self().tell(PoisonPill.getInstance(), ActorRef.noSender());
} else if (message instanceof SaveSnapshotSuccess) {
LOG.debug("{}: got command: {}", persistenceId(), message);
SaveSnapshotSuccess saved = (SaveSnapshotSuccess)message;
deleteSnapshots(new SnapshotSelectionCriteria(saved.metadata().sequenceNr(),
saved.metadata().timestamp() - 1, 0L, 0L));
persisting = false;
unstash();
} else {
LOG.debug("{}: stashing command {}", persistenceId(), message);
stash();
}
}
@Override
protected final void handleRecover(final Object message) {
if (message instanceof RecoveryCompleted) {
if (incarnation != null) {
incarnation = incarnation + 1;
} else {
incarnation = 0;
}
this.localBucket = new LocalBucket<>(incarnation.intValue(), initialData);
initialData = null;
LOG.debug("{}: persisting new incarnation {}", persistenceId(), incarnation);
persisting = true;
saveSnapshot(incarnation);
} else if (message instanceof SnapshotOffer) {
incarnation = (Integer) ((SnapshotOffer)message).snapshot();
LOG.debug("{}: recovered incarnation {}", persistenceId(), incarnation);
} else {
LOG.warn("{}: ignoring recovery message {}", persistenceId(), message);
}
}
protected final RemoteRpcProviderConfig getConfig() {
return config;
}
protected final void updateLocalBucket(final T data) {
final LocalBucket local = getLocalBucket();
final boolean bumpIncarnation = local.setData(data);
versions.put(selfAddress, local.getVersion());
if (bumpIncarnation) {
LOG.debug("Version wrapped. incrementing incarnation");
Verify.verify(incarnation < Integer.MAX_VALUE, "Ran out of incarnations, cannot continue");
incarnation = incarnation + 1;
persisting = true;
saveSnapshot(incarnation);
}
}
/**
* Callback to subclasses invoked when a bucket is removed.
*
* @param address Remote address
* @param bucket Bucket removed
*/
protected abstract void onBucketRemoved(Address address, Bucket bucket);
/**
* Callback to subclasses invoked when the set of remote buckets is updated.
*
* @param newBuckets Map of address to new bucket. Never null, but can be empty.
*/
protected abstract void onBucketsUpdated(Map> newBuckets);
/**
* Helper to collect all known buckets.
*
* @return self owned + remote buckets
*/
private Map> getAllBuckets() {
Map> all = new HashMap<>(remoteBuckets.size() + 1);
//first add the local bucket
all.put(selfAddress, getLocalBucket().snapshot());
//then get all remote buckets
all.putAll(remoteBuckets);
return all;
}
/**
* Helper to collect buckets for requested members.
*
* @param members requested members
*/
private void getBucketsByMembers(final Collection members) {
Map> buckets = new HashMap<>();
//first add the local bucket if asked
if (members.contains(selfAddress)) {
buckets.put(selfAddress, getLocalBucket().snapshot());
}
//then get buckets for requested remote nodes
for (Address address : members) {
if (remoteBuckets.containsKey(address)) {
buckets.put(address, remoteBuckets.get(address));
}
}
getSender().tell(buckets, getSelf());
}
private void removeBucket(final Address addr) {
final Bucket bucket = remoteBuckets.remove(addr);
if (bucket != null) {
bucket.getWatchActor().ifPresent(ref -> removeWatch(addr, ref));
onBucketRemoved(addr, bucket);
}
versions.remove(addr);
}
/**
* Update local copy of remote buckets where local copy's version is older.
*
* @param receivedBuckets buckets sent by remote
* {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
*/
@VisibleForTesting
void updateRemoteBuckets(final Map> receivedBuckets) {
LOG.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
if (receivedBuckets == null || receivedBuckets.isEmpty()) {
//nothing to do
return;
}
final Map> newBuckets = new HashMap<>(receivedBuckets.size());
for (Entry> entry : receivedBuckets.entrySet()) {
final Address addr = entry.getKey();
if (selfAddress.equals(addr)) {
// Remote cannot update our bucket
continue;
}
@SuppressWarnings("unchecked")
final Bucket receivedBucket = (Bucket) entry.getValue();
if (receivedBucket == null) {
LOG.debug("Ignoring null bucket from {}", addr);
continue;
}
// update only if remote version is newer
final long remoteVersion = receivedBucket.getVersion();
final Long localVersion = versions.get(addr);
if (localVersion != null && remoteVersion <= localVersion.longValue()) {
LOG.debug("Ignoring down-versioned bucket from {} ({} local {} remote)", addr, localVersion,
remoteVersion);
continue;
}
newBuckets.put(addr, receivedBucket);
versions.put(addr, remoteVersion);
final Bucket prevBucket = remoteBuckets.put(addr, receivedBucket);
// Deal with DeathWatch subscriptions
final Optional prevRef = prevBucket != null ? prevBucket.getWatchActor() : Optional.empty();
final Optional curRef = receivedBucket.getWatchActor();
if (!curRef.equals(prevRef)) {
prevRef.ifPresent(ref -> removeWatch(addr, ref));
curRef.ifPresent(ref -> addWatch(addr, ref));
}
LOG.debug("Updating bucket from {} to version {}", entry.getKey(), remoteVersion);
}
LOG.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
onBucketsUpdated(newBuckets);
}
private void addWatch(final Address addr, final ActorRef ref) {
if (!watchedActors.containsKey(ref)) {
getContext().watch(ref);
LOG.debug("Watching {}", ref);
}
watchedActors.put(ref, addr);
}
private void removeWatch(final Address addr, final ActorRef ref) {
watchedActors.remove(ref, addr);
if (!watchedActors.containsKey(ref)) {
getContext().unwatch(ref);
LOG.debug("No longer watching {}", ref);
}
}
private void actorTerminated(final Terminated message) {
LOG.info("Actor termination {} received", message);
for (Address addr : watchedActors.removeAll(message.getActor())) {
versions.remove(addr);
final Bucket bucket = remoteBuckets.remove(addr);
if (bucket != null) {
LOG.debug("Source actor dead, removing bucket {} from {}", bucket, addr);
onBucketRemoved(addr, bucket);
}
}
}
@VisibleForTesting
protected boolean isPersisting() {
return persisting;
}
private LocalBucket getLocalBucket() {
Preconditions.checkState(localBucket != null, "Attempted to access local bucket before recovery completed");
return localBucket;
}
}