package org.opendaylight.controller.remote.rpc.registry.gossip;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_ALL_BUCKETS;
+import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_BUCKET_VERSIONS;
import akka.actor.ActorRef;
import akka.actor.ActorRefProvider;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.SetMultimap;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
-import java.util.Set;
+import java.util.function.Consumer;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
/**
* A store that syncs its data across nodes in the cluster.
* <p>
* Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol).
* This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
- *
*/
-public class BucketStore<T extends BucketData<T>> extends AbstractUntypedPersistentActorWithMetering {
+public abstract class BucketStoreActor<T extends BucketData<T>> extends
+ AbstractUntypedPersistentActorWithMetering {
+ // Internal marker interface for messages which are just bridges to execute a method
+ @FunctionalInterface
+ private interface ExecuteInActor extends Consumer<BucketStoreActor<?>> {
+
+ }
+
/**
* Buckets owned by other known nodes in the cluster.
*/
private Integer incarnation;
private boolean persisting;
- public BucketStore(final RemoteRpcProviderConfig config, final String persistenceId, final T initialData) {
+ protected BucketStoreActor(final RemoteRpcProviderConfig config, final String persistenceId, final T initialData) {
this.config = Preconditions.checkNotNull(config);
this.initialData = Preconditions.checkNotNull(initialData);
this.persistenceId = Preconditions.checkNotNull(persistenceId);
}
+ static ExecuteInActor getBucketsByMembersMessage(final Collection<Address> members) {
+ return actor -> actor.getBucketsByMembers(members);
+ }
+
+ static ExecuteInActor removeBucketMessage(final Address addr) {
+ return actor -> actor.removeBucket(addr);
+ }
+
+ static ExecuteInActor updateRemoteBucketsMessage(final Map<Address, Bucket<?>> buckets) {
+ return actor -> actor.updateRemoteBuckets(buckets);
+ }
+
+ public final T getLocalData() {
+ return getLocalBucket().getData();
+ }
+
+ public final Map<Address, Bucket<T>> getRemoteBuckets() {
+ return remoteBuckets;
+ }
+
+ public final Map<Address, Long> getVersions() {
+ return versions;
+ }
+
@Override
- public String persistenceId() {
+ public final String persistenceId() {
return persistenceId;
}
}
}
- @SuppressWarnings("unchecked")
@Override
protected void handleCommand(final Object message) throws Exception {
- if (message instanceof GetAllBuckets) {
+ if (GET_ALL_BUCKETS == message) {
// GetAllBuckets is used only in testing
- receiveGetAllBuckets();
+ getSender().tell(getAllBuckets(), self());
return;
}
return;
}
- if (message instanceof GetBucketsByMembers) {
- receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
- } else if (message instanceof GetBucketVersions) {
- receiveGetBucketVersions();
- } else if (message instanceof UpdateRemoteBuckets) {
- receiveUpdateRemoteBuckets(((UpdateRemoteBuckets<T>) message).getBuckets());
- } else if (message instanceof RemoveRemoteBucket) {
- removeBucket(((RemoveRemoteBucket) message).getAddress());
+ if (message instanceof ExecuteInActor) {
+ ((ExecuteInActor) message).accept(this);
+ } else if (GET_BUCKET_VERSIONS == message) {
+ // FIXME: do we need to send ourselves?
+ getSender().tell(ImmutableMap.copyOf(versions), getSelf());
} else if (message instanceof Terminated) {
actorTerminated((Terminated) message);
} else if (message instanceof DeleteSnapshotsSuccess) {
}
@Override
- protected void handleRecover(final Object message) throws Exception {
+ protected final void handleRecover(final Object message) throws Exception {
if (message instanceof RecoveryCompleted) {
if (incarnation != null) {
incarnation = incarnation + 1;
}
}
- protected RemoteRpcProviderConfig getConfig() {
+ protected final RemoteRpcProviderConfig getConfig() {
return config;
}
+ protected final void updateLocalBucket(final T data) {
+ final LocalBucket<T> local = getLocalBucket();
+ final boolean bumpIncarnation = local.setData(data);
+ versions.put(selfAddress, local.getVersion());
+
+ if (bumpIncarnation) {
+ LOG.debug("Version wrapped. incrementing incarnation");
+
+ Verify.verify(incarnation < Integer.MAX_VALUE, "Ran out of incarnations, cannot continue");
+ incarnation = incarnation + 1;
+
+ persisting = true;
+ saveSnapshot(incarnation);
+ }
+ }
+
/**
- * Returns all the buckets the this node knows about, self owned + remote.
+ * Callback to subclasses invoked when a bucket is removed.
+ *
+ * @param address Remote address
+ * @param bucket Bucket removed
*/
- @VisibleForTesting
- protected void receiveGetAllBuckets() {
- final ActorRef sender = getSender();
- sender.tell(new GetAllBucketsReply<>(getAllBuckets()), getSelf());
- }
+ protected abstract void onBucketRemoved(final Address address, final Bucket<T> bucket);
+
+ /**
+ * Callback to subclasses invoked when the set of remote buckets is updated.
+ *
+ * @param newBuckets Map of address to new bucket. Never null, but can be empty.
+ */
+ protected abstract void onBucketsUpdated(final Map<Address, Bucket<T>> newBuckets);
/**
* Helper to collect all known buckets.
*
* @return self owned + remote buckets
*/
- Map<Address, Bucket<T>> getAllBuckets() {
+ private Map<Address, Bucket<T>> getAllBuckets() {
Map<Address, Bucket<T>> all = new HashMap<>(remoteBuckets.size() + 1);
//first add the local bucket
return all;
}
- /**
- * Returns buckets for requested members that this node knows about.
- *
- * @param members requested members
- */
- void receiveGetBucketsByMembers(final Set<Address> members) {
- final ActorRef sender = getSender();
- Map<Address, Bucket<T>> buckets = getBucketsByMembers(members);
- sender.tell(new GetBucketsByMembersReply<>(buckets), getSelf());
- }
-
/**
* Helper to collect buckets for requested members.
*
* @param members requested members
- * @return buckets for requested members
*/
- Map<Address, Bucket<T>> getBucketsByMembers(final Set<Address> members) {
+ private void getBucketsByMembers(final Collection<Address> members) {
Map<Address, Bucket<T>> buckets = new HashMap<>();
//first add the local bucket if asked
}
}
- return buckets;
+ getSender().tell(buckets, getSelf());
}
- /**
- * Returns versions for all buckets known.
- */
- void receiveGetBucketVersions() {
- final ActorRef sender = getSender();
- GetBucketVersionsReply reply = new GetBucketVersionsReply(versions);
- sender.tell(reply, getSelf());
+ private void removeBucket(final Address addr) {
+ final Bucket<T> bucket = remoteBuckets.remove(addr);
+ if (bucket != null) {
+ bucket.getWatchActor().ifPresent(ref -> removeWatch(addr, ref));
+ onBucketRemoved(addr, bucket);
+ }
}
/**
* @param receivedBuckets buckets sent by remote
* {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
*/
- void receiveUpdateRemoteBuckets(final Map<Address, Bucket<T>> receivedBuckets) {
+ @VisibleForTesting
+ void updateRemoteBuckets(final Map<Address, Bucket<?>> receivedBuckets) {
LOG.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
if (receivedBuckets == null || receivedBuckets.isEmpty()) {
//nothing to do
}
final Map<Address, Bucket<T>> newBuckets = new HashMap<>(receivedBuckets.size());
- for (Entry<Address, Bucket<T>> entry : receivedBuckets.entrySet()) {
+ for (Entry<Address, Bucket<?>> entry : receivedBuckets.entrySet()) {
final Address addr = entry.getKey();
if (selfAddress.equals(addr)) {
continue;
}
- final Bucket<T> receivedBucket = entry.getValue();
+ @SuppressWarnings("unchecked")
+ final Bucket<T> receivedBucket = (Bucket<T>) entry.getValue();
if (receivedBucket == null) {
LOG.debug("Ignoring null bucket from {}", addr);
continue;
}
}
- private void removeBucket(final Address addr) {
- final Bucket<T> bucket = remoteBuckets.remove(addr);
- if (bucket != null) {
- bucket.getWatchActor().ifPresent(ref -> removeWatch(addr, ref));
- onBucketRemoved(addr, bucket);
- }
- }
-
private void actorTerminated(final Terminated message) {
LOG.info("Actor termination {} received", message);
}
}
- /**
- * Callback to subclasses invoked when a bucket is removed.
- *
- * @param address Remote address
- * @param bucket Bucket removed
- */
- protected void onBucketRemoved(final Address address, final Bucket<T> bucket) {
- // Default noop
- }
-
- /**
- * Callback to subclasses invoked when the set of remote buckets is updated.
- *
- * @param newBuckets Map of address to new bucket. Never null, but can be empty.
- */
- protected void onBucketsUpdated(final Map<Address, Bucket<T>> newBuckets) {
- // Default noop
- }
-
@VisibleForTesting
protected boolean isPersisting() {
return persisting;
}
- public T getLocalData() {
- return getLocalBucket().getData();
- }
-
private LocalBucket<T> getLocalBucket() {
Preconditions.checkState(localBucket != null, "Attempted to access local bucket before recovery completed");
return localBucket;
}
-
- protected void updateLocalBucket(final T data) {
- final LocalBucket<T> local = getLocalBucket();
- final boolean bumpIncarnation = local.setData(data);
- versions.put(selfAddress, local.getVersion());
-
- if (bumpIncarnation) {
- LOG.debug("Version wrapped. incrementing incarnation");
-
- Verify.verify(incarnation < Integer.MAX_VALUE, "Ran out of incarnations, cannot continue");
- incarnation = incarnation + 1;
-
- persisting = true;
- saveSnapshot(incarnation);
- }
- }
-
- public Map<Address, Bucket<T>> getRemoteBuckets() {
- return remoteBuckets;
- }
-
- public Map<Address, Long> getVersions() {
- return versions;
- }
}