import akka.actor.ActorRef;
import akka.actor.ActorRefProvider;
import akka.actor.Address;
-import akka.actor.Props;
+import akka.actor.Terminated;
import akka.cluster.ClusterActorRefProvider;
import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
import java.util.HashMap;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
import org.opendaylight.controller.utils.ConditionalProbe;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* A store that syncs its data across nodes in the cluster.
* This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
*
*/
-public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMetering {
-
- private static final Long NO_VERSION = -1L;
-
- protected final Logger log = LoggerFactory.getLogger(getClass());
-
+public class BucketStore<T extends BucketData<T>> extends AbstractUntypedActorWithMetering {
/**
* Bucket owned by the node.
*/
- private final BucketImpl<T> localBucket = new BucketImpl<>();
+ private final BucketImpl<T> localBucket;
/**
- * Buckets ownded by other known nodes in the cluster.
+ * Buckets owned by other known nodes in the cluster.
*/
private final Map<Address, Bucket<T>> remoteBuckets = new HashMap<>();
*/
private final Map<Address, Long> versions = new HashMap<>();
+ /**
+ * {@link ActorRef}s being watched for liveness due to being referenced in bucket data. Each actor is monitored
+ * once, possibly being tied to multiple addresses (and by extension, buckets).
+ */
+ private final SetMultimap<ActorRef, Address> watchedActors = HashMultimap.create(1, 1);
+
/**
* Cluster address for this node.
*/
private Address selfAddress;
+ // FIXME: should be part of test-specific subclass
private ConditionalProbe probe;
private final RemoteRpcProviderConfig config;
- public BucketStore(RemoteRpcProviderConfig config) {
+ public BucketStore(final RemoteRpcProviderConfig config, final T initialData) {
this.config = Preconditions.checkNotNull(config);
+ this.localBucket = new BucketImpl<>(initialData);
}
@Override
ActorRefProvider provider = getContext().provider();
selfAddress = provider.getDefaultAddress();
- if ( provider instanceof ClusterActorRefProvider) {
- getContext().actorOf(Props.create(Gossiper.class, config).withMailbox(config.getMailBoxName()), "gossiper");
+ if (provider instanceof ClusterActorRefProvider) {
+ getContext().actorOf(Gossiper.props(config).withMailbox(config.getMailBoxName()), "gossiper");
}
}
@SuppressWarnings("unchecked")
@Override
- protected void handleReceive(Object message) throws Exception {
+ protected void handleReceive(final Object message) throws Exception {
if (probe != null) {
probe.tell(message, getSelf());
}
- if (message instanceof ConditionalProbe) {
- // The ConditionalProbe is only used for unit tests.
- log.info("Received probe {} {}", getSelf(), message);
- probe = (ConditionalProbe) message;
- // Send back any message to tell the caller we got the probe.
- getSender().tell("Got it", getSelf());
- } else if (message instanceof GetAllBuckets) {
- receiveGetAllBuckets();
- } else if (message instanceof GetBucketsByMembers) {
+ if (message instanceof GetBucketsByMembers) {
receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
} else if (message instanceof GetBucketVersions) {
receiveGetBucketVersions();
} else if (message instanceof UpdateRemoteBuckets) {
receiveUpdateRemoteBuckets(((UpdateRemoteBuckets<T>) message).getBuckets());
+ } else if (message instanceof RemoveRemoteBucket) {
+ removeBucket(((RemoveRemoteBucket) message).getAddress());
+ } else if (message instanceof Terminated) {
+ actorTerminated((Terminated) message);
+ } else if (message instanceof GetAllBuckets) {
+ // GetAllBuckets is used only for unit tests.
+ receiveGetAllBuckets();
+ } else if (message instanceof ConditionalProbe) {
+ // The ConditionalProbe is only used for unit tests.
+ LOG.info("Received probe {} {}", getSelf(), message);
+ probe = (ConditionalProbe) message;
+ // Send back any message to tell the caller we got the probe.
+ getSender().tell("Got it", getSelf());
} else {
- log.debug("Unhandled message [{}]", message);
+ LOG.debug("Unhandled message [{}]", message);
unhandled(message);
}
}
*
* @param members requested members
*/
- void receiveGetBucketsByMembers(Set<Address> members) {
+ void receiveGetBucketsByMembers(final Set<Address> members) {
final ActorRef sender = getSender();
Map<Address, Bucket<T>> buckets = getBucketsByMembers(members);
sender.tell(new GetBucketsByMembersReply<>(buckets), getSelf());
* @param members requested members
* @return buckets for requested members
*/
- Map<Address, Bucket<T>> getBucketsByMembers(Set<Address> members) {
+ Map<Address, Bucket<T>> getBucketsByMembers(final Set<Address> members) {
Map<Address, Bucket<T>> buckets = new HashMap<>();
//first add the local bucket if asked
* @param receivedBuckets buckets sent by remote
* {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
*/
- void receiveUpdateRemoteBuckets(Map<Address, Bucket<T>> receivedBuckets) {
- log.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
+ void receiveUpdateRemoteBuckets(final Map<Address, Bucket<T>> receivedBuckets) {
+ LOG.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
if (receivedBuckets == null || receivedBuckets.isEmpty()) {
- return; //nothing to do
+ //nothing to do
+ return;
}
- //Remote cant update self's bucket
- receivedBuckets.remove(selfAddress);
+ final Map<Address, Bucket<T>> newBuckets = new HashMap<>(receivedBuckets.size());
+ for (Entry<Address, Bucket<T>> entry : receivedBuckets.entrySet()) {
+ final Address addr = entry.getKey();
- for (Map.Entry<Address, Bucket<T>> entry : receivedBuckets.entrySet()) {
-
- Long localVersion = versions.get(entry.getKey());
- if (localVersion == null) {
- localVersion = NO_VERSION;
+ if (selfAddress.equals(addr)) {
+ // Remote cannot update our bucket
+ continue;
}
- Bucket<T> receivedBucket = entry.getValue();
-
+ final Bucket<T> receivedBucket = entry.getValue();
if (receivedBucket == null) {
+ LOG.debug("Ignoring null bucket from {}", addr);
continue;
}
- Long remoteVersion = receivedBucket.getVersion();
- if (remoteVersion == null) {
- remoteVersion = NO_VERSION;
+ // update only if remote version is newer
+ final long remoteVersion = receivedBucket.getVersion();
+ final Long localVersion = versions.get(addr);
+ if (localVersion != null && remoteVersion <= localVersion.longValue()) {
+ LOG.debug("Ignoring down-versioned bucket from {} ({} local {} remote)", addr, localVersion,
+ remoteVersion);
+ continue;
}
-
- //update only if remote version is newer
- if ( remoteVersion.longValue() > localVersion.longValue() ) {
- remoteBuckets.put(entry.getKey(), receivedBucket);
- versions.put(entry.getKey(), remoteVersion);
+ newBuckets.put(addr, receivedBucket);
+ versions.put(addr, remoteVersion);
+ final Bucket<T> prevBucket = remoteBuckets.put(addr, receivedBucket);
+
+ // Deal with DeathWatch subscriptions
+ final Optional<ActorRef> prevRef = prevBucket != null ? prevBucket.getWatchActor() : Optional.empty();
+ final Optional<ActorRef> curRef = receivedBucket.getWatchActor();
+ if (!curRef.equals(prevRef)) {
+ prevRef.ifPresent(ref -> removeWatch(addr, ref));
+ curRef.ifPresent(ref -> addWatch(addr, ref));
}
+
+ LOG.debug("Updating bucket from {} to version {}", entry.getKey(), remoteVersion);
+ }
+
+ LOG.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
+
+ onBucketsUpdated(newBuckets);
+ }
+
+ private void addWatch(final Address addr, final ActorRef ref) {
+ if (!watchedActors.containsKey(ref)) {
+ getContext().watch(ref);
+ LOG.debug("Watching {}", ref);
}
+ watchedActors.put(ref, addr);
+ }
- log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
+ private void removeWatch(final Address addr, final ActorRef ref) {
+ watchedActors.remove(ref, addr);
+ if (!watchedActors.containsKey(ref)) {
+ getContext().unwatch(ref);
+ LOG.debug("No longer watching {}", ref);
+ }
+ }
- onBucketsUpdated();
+ private void removeBucket(final Address addr) {
+ final Bucket<T> bucket = remoteBuckets.remove(addr);
+ if (bucket != null) {
+ bucket.getWatchActor().ifPresent(ref -> removeWatch(addr, ref));
+ onBucketRemoved(addr, bucket);
+ }
}
- protected void onBucketsUpdated() {
+ private void actorTerminated(final Terminated message) {
+ LOG.info("Actor termination {} received", message);
+
+ for (Address addr : watchedActors.removeAll(message.getActor())) {
+ versions.remove(addr);
+ final Bucket<T> bucket = remoteBuckets.remove(addr);
+ if (bucket != null) {
+ LOG.debug("Source actor dead, removing bucket {} from ", bucket, addr);
+ onBucketRemoved(addr, bucket);
+ }
+ }
+ }
+
+ /**
+ * Callback to subclasses invoked when a bucket is removed.
+ *
+ * @param address Remote address
+ * @param bucket Bucket removed
+ */
+ protected void onBucketRemoved(final Address address, final Bucket<T> bucket) {
+ // Default noop
+ }
+
+ /**
+ * Callback to subclasses invoked when the set of remote buckets is updated.
+ *
+ * @param newBuckets Map of address to new bucket. Never null, but can be empty.
+ */
+ protected void onBucketsUpdated(final Map<Address, Bucket<T>> newBuckets) {
+ // Default noop
}
public BucketImpl<T> getLocalBucket() {
return localBucket;
}
- protected void updateLocalBucket(T data) {
+ protected void updateLocalBucket(final T data) {
localBucket.setData(data);
versions.put(selfAddress, localBucket.getVersion());
}