package org.opendaylight.controller.remote.rpc.registry.gossip;
import akka.actor.ActorRef;
+import akka.actor.ActorRefProvider;
import akka.actor.Address;
-import akka.actor.Props;
-import akka.actor.UntypedActor;
-import akka.cluster.Cluster;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-
+import akka.actor.Terminated;
+import akka.cluster.ClusterActorRefProvider;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
import java.util.HashMap;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
+import org.opendaylight.controller.utils.ConditionalProbe;
/**
* A store that syncs its data across nodes in the cluster.
* It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned.
* A node can write ONLY to its bucket. This way, write conflicts are avoided.
+ *
* <p>
- * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol)<p>
+ * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol).
* This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
*
*/
-public class BucketStore extends UntypedActor {
-
- final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
-
+public class BucketStore<T extends BucketData<T>> extends AbstractUntypedActorWithMetering {
/**
- * Bucket owned by the node
+ * Bucket owned by the node.
*/
- private BucketImpl localBucket = new BucketImpl();;
+ private final BucketImpl<T> localBucket;
/**
- * Buckets ownded by other known nodes in the cluster
+ * Buckets owned by other known nodes in the cluster.
*/
- private ConcurrentMap<Address, Bucket> remoteBuckets = new ConcurrentHashMap<>();
+ private final Map<Address, Bucket<T>> remoteBuckets = new HashMap<>();
/**
- * Bucket version for every known node in the cluster including this node
+ * Bucket version for every known node in the cluster including this node.
*/
- private ConcurrentMap<Address, Long> versions = new ConcurrentHashMap<>();
+ private final Map<Address, Long> versions = new HashMap<>();
/**
- * Cluster address for this node
+ * {@link ActorRef}s being watched for liveness due to being referenced in bucket data. Each actor is monitored
+ * once, possibly being tied to multiple addresses (and by extension, buckets).
*/
- private final Address selfAddress = Cluster.get(getContext().system()).selfAddress();
+ private final SetMultimap<ActorRef, Address> watchedActors = HashMultimap.create(1, 1);
/**
- * Our private gossiper
+ * Cluster address for this node.
*/
- private ActorRef gossiper;
+ private Address selfAddress;
- public BucketStore(){
- gossiper = getContext().actorOf(Props.create(Gossiper.class), "gossiper");
- }
+ // FIXME: should be part of test-specific subclass
+ private ConditionalProbe probe;
- /**
- * This constructor is useful for testing.
- * TODO: Pass Props instead of ActorRef
- *
- * @param gossiper
- */
- public BucketStore(ActorRef gossiper){
- this.gossiper = gossiper;
+ private final RemoteRpcProviderConfig config;
+
+ public BucketStore(final RemoteRpcProviderConfig config, final T initialData) {
+ this.config = Preconditions.checkNotNull(config);
+ this.localBucket = new BucketImpl<>(initialData);
}
@Override
- public void onReceive(Object message) throws Exception {
-
- log.debug("Received message: node[{}], message[{}]", selfAddress, message);
+ public void preStart() {
+ ActorRefProvider provider = getContext().provider();
+ selfAddress = provider.getDefaultAddress();
- if (message instanceof UpdateBucket)
- receiveUpdateBucket(((UpdateBucket) message).getBucket());
-
- else if (message instanceof GetAllBuckets)
- receiveGetAllBucket();
+ if (provider instanceof ClusterActorRefProvider) {
+ getContext().actorOf(Gossiper.props(config).withMailbox(config.getMailBoxName()), "gossiper");
+ }
+ }
- else if (message instanceof GetLocalBucket)
- receiveGetLocalBucket();
+ @SuppressWarnings("unchecked")
+ @Override
+ protected void handleReceive(final Object message) throws Exception {
+ if (probe != null) {
+ probe.tell(message, getSelf());
+ }
- else if (message instanceof GetBucketsByMembers)
+ if (message instanceof GetBucketsByMembers) {
receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
-
- else if (message instanceof GetBucketVersions)
+ } else if (message instanceof GetBucketVersions) {
receiveGetBucketVersions();
-
- else if (message instanceof UpdateRemoteBuckets)
- receiveUpdateRemoteBuckets(((UpdateRemoteBuckets) message).getBuckets());
-
- else {
- log.debug("Unhandled message [{}]", message);
+ } else if (message instanceof UpdateRemoteBuckets) {
+ receiveUpdateRemoteBuckets(((UpdateRemoteBuckets<T>) message).getBuckets());
+ } else if (message instanceof RemoveRemoteBucket) {
+ removeBucket(((RemoveRemoteBucket) message).getAddress());
+ } else if (message instanceof Terminated) {
+ actorTerminated((Terminated) message);
+ } else if (message instanceof GetAllBuckets) {
+ // GetAllBuckets is used only for unit tests.
+ receiveGetAllBuckets();
+ } else if (message instanceof ConditionalProbe) {
+ // The ConditionalProbe is only used for unit tests.
+ LOG.info("Received probe {} {}", getSelf(), message);
+ probe = (ConditionalProbe) message;
+ // Send back any message to tell the caller we got the probe.
+ getSender().tell("Got it", getSelf());
+ } else {
+ LOG.debug("Unhandled message [{}]", message);
unhandled(message);
}
-
- }
-
- /**
- * Returns a copy of bucket owned by this node
- */
- private void receiveGetLocalBucket() {
- final ActorRef sender = getSender();
- GetLocalBucketReply reply = new GetLocalBucketReply(localBucket);
- sender.tell(reply, getSelf());
}
- /**
- * Updates the bucket owned by this node
- *
- * @param updatedBucket
- */
- void receiveUpdateBucket(Bucket updatedBucket){
-
- localBucket = (BucketImpl) updatedBucket;
- versions.put(selfAddress, localBucket.getVersion());
+ protected RemoteRpcProviderConfig getConfig() {
+ return config;
}
/**
- * Returns all the buckets the this node knows about, self owned + remote
+ * Returns all the buckets the this node knows about, self owned + remote.
*/
- void receiveGetAllBucket(){
+ void receiveGetAllBuckets() {
final ActorRef sender = getSender();
- sender.tell(new GetAllBucketsReply(getAllBuckets()), getSelf());
+ sender.tell(new GetAllBucketsReply<>(getAllBuckets()), getSelf());
}
/**
- * Helper to collect all known buckets
+ * Helper to collect all known buckets.
*
* @return self owned + remote buckets
*/
- Map<Address, Bucket> getAllBuckets(){
- Map<Address, Bucket> all = new HashMap<>(remoteBuckets.size() + 1);
+ Map<Address, Bucket<T>> getAllBuckets() {
+ Map<Address, Bucket<T>> all = new HashMap<>(remoteBuckets.size() + 1);
//first add the local bucket
- all.put(selfAddress, localBucket);
+ all.put(selfAddress, new BucketImpl<>(localBucket));
//then get all remote buckets
all.putAll(remoteBuckets);
}
/**
- * Returns buckets for requested members that this node knows about
+ * Returns buckets for requested members that this node knows about.
*
* @param members requested members
*/
- void receiveGetBucketsByMembers(Set<Address> members){
+ void receiveGetBucketsByMembers(final Set<Address> members) {
final ActorRef sender = getSender();
- Map<Address, Bucket> buckets = getBucketsByMembers(members);
- sender.tell(new GetBucketsByMembersReply(buckets), getSelf());
+ Map<Address, Bucket<T>> buckets = getBucketsByMembers(members);
+ sender.tell(new GetBucketsByMembersReply<>(buckets), getSelf());
}
/**
- * Helper to collect buckets for requested memebers
+ * Helper to collect buckets for requested members.
*
* @param members requested members
- * @return buckets for requested memebers
+ * @return buckets for requested members
*/
- Map<Address, Bucket> getBucketsByMembers(Set<Address> members) {
- Map<Address, Bucket> buckets = new HashMap<>();
+ Map<Address, Bucket<T>> getBucketsByMembers(final Set<Address> members) {
+ Map<Address, Bucket<T>> buckets = new HashMap<>();
//first add the local bucket if asked
- if (members.contains(selfAddress))
- buckets.put(selfAddress, localBucket);
+ if (members.contains(selfAddress)) {
+ buckets.put(selfAddress, new BucketImpl<>(localBucket));
+ }
//then get buckets for requested remote nodes
- for (Address address : members){
- if (remoteBuckets.containsKey(address))
+ for (Address address : members) {
+ if (remoteBuckets.containsKey(address)) {
buckets.put(address, remoteBuckets.get(address));
+ }
}
return buckets;
}
/**
- * Returns versions for all buckets known
+ * Returns versions for all buckets known.
*/
- void receiveGetBucketVersions(){
+ void receiveGetBucketVersions() {
final ActorRef sender = getSender();
GetBucketVersionsReply reply = new GetBucketVersionsReply(versions);
sender.tell(reply, getSelf());
}
/**
- * Update local copy of remote buckets where local copy's version is older
+ * Update local copy of remote buckets where local copy's version is older.
*
* @param receivedBuckets buckets sent by remote
* {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
*/
- void receiveUpdateRemoteBuckets(Map<Address, Bucket> receivedBuckets){
+ void receiveUpdateRemoteBuckets(final Map<Address, Bucket<T>> receivedBuckets) {
+ LOG.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
+ if (receivedBuckets == null || receivedBuckets.isEmpty()) {
+ //nothing to do
+ return;
+ }
+
+ final Map<Address, Bucket<T>> newBuckets = new HashMap<>(receivedBuckets.size());
+ for (Entry<Address, Bucket<T>> entry : receivedBuckets.entrySet()) {
+ final Address addr = entry.getKey();
- if (receivedBuckets == null || receivedBuckets.isEmpty())
- return; //nothing to do
+ if (selfAddress.equals(addr)) {
+ // Remote cannot update our bucket
+ continue;
+ }
- //Remote cant update self's bucket
- receivedBuckets.remove(selfAddress);
+ final Bucket<T> receivedBucket = entry.getValue();
+ if (receivedBucket == null) {
+ LOG.debug("Ignoring null bucket from {}", addr);
+ continue;
+ }
- for (Map.Entry<Address, Bucket> entry : receivedBuckets.entrySet()){
+ // update only if remote version is newer
+ final long remoteVersion = receivedBucket.getVersion();
+ final Long localVersion = versions.get(addr);
+ if (localVersion != null && remoteVersion <= localVersion.longValue()) {
+ LOG.debug("Ignoring down-versioned bucket from {} ({} local {} remote)", addr, localVersion,
+ remoteVersion);
+ continue;
+ }
+ newBuckets.put(addr, receivedBucket);
+ versions.put(addr, remoteVersion);
+ final Bucket<T> prevBucket = remoteBuckets.put(addr, receivedBucket);
+
+ // Deal with DeathWatch subscriptions
+ final Optional<ActorRef> prevRef = prevBucket != null ? prevBucket.getWatchActor() : Optional.empty();
+ final Optional<ActorRef> curRef = receivedBucket.getWatchActor();
+ if (!curRef.equals(prevRef)) {
+ prevRef.ifPresent(ref -> removeWatch(addr, ref));
+ curRef.ifPresent(ref -> addWatch(addr, ref));
+ }
- Long localVersion = versions.get(entry.getKey());
- if (localVersion == null) localVersion = -1L;
+ LOG.debug("Updating bucket from {} to version {}", entry.getKey(), remoteVersion);
+ }
- Bucket receivedBucket = entry.getValue();
+ LOG.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
- if (receivedBucket == null)
- continue;
+ onBucketsUpdated(newBuckets);
+ }
- Long remoteVersion = receivedBucket.getVersion();
- if (remoteVersion == null) remoteVersion = -1L;
+ private void addWatch(final Address addr, final ActorRef ref) {
+ if (!watchedActors.containsKey(ref)) {
+ getContext().watch(ref);
+ LOG.debug("Watching {}", ref);
+ }
+ watchedActors.put(ref, addr);
+ }
- //update only if remote version is newer
- if ( remoteVersion > localVersion ) {
- remoteBuckets.put(entry.getKey(), receivedBucket);
- versions.put(entry.getKey(), remoteVersion);
- }
+ private void removeWatch(final Address addr, final ActorRef ref) {
+ watchedActors.remove(ref, addr);
+ if (!watchedActors.containsKey(ref)) {
+ getContext().unwatch(ref);
+ LOG.debug("No longer watching {}", ref);
}
+ }
- log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
+ private void removeBucket(final Address addr) {
+ final Bucket<T> bucket = remoteBuckets.remove(addr);
+ if (bucket != null) {
+ bucket.getWatchActor().ifPresent(ref -> removeWatch(addr, ref));
+ onBucketRemoved(addr, bucket);
+ }
}
- ///
- ///Getter Setters
- ///
+ private void actorTerminated(final Terminated message) {
+ LOG.info("Actor termination {} received", message);
- BucketImpl getLocalBucket() {
- return localBucket;
+ for (Address addr : watchedActors.removeAll(message.getActor())) {
+ versions.remove(addr);
+ final Bucket<T> bucket = remoteBuckets.remove(addr);
+ if (bucket != null) {
+ LOG.debug("Source actor dead, removing bucket {} from ", bucket, addr);
+ onBucketRemoved(addr, bucket);
+ }
+ }
}
- void setLocalBucket(BucketImpl localBucket) {
- this.localBucket = localBucket;
+ /**
+ * Callback to subclasses invoked when a bucket is removed.
+ *
+ * @param address Remote address
+ * @param bucket Bucket removed
+ */
+ protected void onBucketRemoved(final Address address, final Bucket<T> bucket) {
+ // Default noop
}
- ConcurrentMap<Address, Bucket> getRemoteBuckets() {
- return remoteBuckets;
+ /**
+ * Callback to subclasses invoked when the set of remote buckets is updated.
+ *
+ * @param newBuckets Map of address to new bucket. Never null, but can be empty.
+ */
+ protected void onBucketsUpdated(final Map<Address, Bucket<T>> newBuckets) {
+ // Default noop
}
- void setRemoteBuckets(ConcurrentMap<Address, Bucket> remoteBuckets) {
- this.remoteBuckets = remoteBuckets;
+ public BucketImpl<T> getLocalBucket() {
+ return localBucket;
}
- ConcurrentMap<Address, Long> getVersions() {
- return versions;
+ protected void updateLocalBucket(final T data) {
+ localBucket.setData(data);
+ versions.put(selfAddress, localBucket.getVersion());
}
- void setVersions(ConcurrentMap<Address, Long> versions) {
- this.versions = versions;
+ public Map<Address, Bucket<T>> getRemoteBuckets() {
+ return remoteBuckets;
}
- Address getSelfAddress() {
- return selfAddress;
+ public Map<Address, Long> getVersions() {
+ return versions;
}
}