X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2Fgossip%2FBucketStoreAccess.java;fp=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2Fgossip%2FBucketStoreAccess.java;h=e06e5fb15bfaf38cb7b522eb9f57efe3d6920c75;hb=efff2ad1ea02712f00013aa3b40529ceecf5e29b;hp=b0e1cde3e7b247b8f916b58c1e73a517f78942e7;hpb=b1b86bdf6913465b13edde9a64ede4d9b9d19e3d;p=controller.git diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreAccess.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreAccess.java index b0e1cde3e7..e06e5fb15b 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreAccess.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStoreAccess.java @@ -8,10 +8,11 @@ package org.opendaylight.controller.remote.rpc.registry.gossip; import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor.getBucketsByMembersMessage; +import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor.getLocalDataMessage; +import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor.getRemoteBucketsMessage; import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor.removeBucketMessage; import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreActor.updateRemoteBucketsMessage; -import akka.actor.ActorContext; import akka.actor.ActorRef; import akka.actor.Address; import akka.dispatch.OnComplete; @@ -19,10 +20,12 @@ import akka.pattern.Patterns; import akka.util.Timeout; import com.google.common.annotations.Beta; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import java.util.Collection; import java.util.Map; +import java.util.Objects; import java.util.function.Consumer; +import scala.concurrent.ExecutionContext; +import scala.concurrent.Future; /** * Convenience access to {@link BucketStoreActor}. Used mostly by {@link Gossiper}. @@ -32,17 +35,19 @@ import java.util.function.Consumer; @Beta @VisibleForTesting public final class BucketStoreAccess { - private final ActorContext context; + private final ActorRef actorRef; + private final ExecutionContext dispatcher; private final Timeout timeout; - BucketStoreAccess(final ActorContext context, final Timeout timeout) { - this.context = Preconditions.checkNotNull(context); - this.timeout = Preconditions.checkNotNull(timeout); + public BucketStoreAccess(final ActorRef actorRef, final ExecutionContext dispatcher, final Timeout timeout) { + this.actorRef = Objects.requireNonNull(actorRef); + this.dispatcher = Objects.requireNonNull(dispatcher); + this.timeout = Objects.requireNonNull(timeout); } > void getBucketsByMembers(final Collection
members, final Consumer>> callback) { - Patterns.ask(context.parent(), getBucketsByMembersMessage(members), timeout) + Patterns.ask(actorRef, getBucketsByMembersMessage(members), timeout) .onComplete(new OnComplete() { @SuppressWarnings("unchecked") @Override @@ -51,11 +56,11 @@ public final class BucketStoreAccess { callback.accept((Map>) success); } } - }, context.dispatcher()); + }, dispatcher); } void getBucketVersions(final Consumer> callback) { - Patterns.ask(context.parent(), Singletons.GET_BUCKET_VERSIONS, timeout).onComplete(new OnComplete() { + Patterns.ask(actorRef, Singletons.GET_BUCKET_VERSIONS, timeout).onComplete(new OnComplete() { @SuppressWarnings("unchecked") @Override public void onComplete(final Throwable failure, final Object success) { @@ -63,16 +68,31 @@ public final class BucketStoreAccess { callback.accept((Map) success); } } - }, context.dispatcher()); + }, dispatcher); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public Future> getBucketVersions() { + return (Future) Patterns.ask(actorRef, Singletons.GET_BUCKET_VERSIONS, timeout); } @SuppressWarnings("unchecked") void updateRemoteBuckets(final Map> buckets) { - context.parent().tell(updateRemoteBucketsMessage((Map>) buckets), ActorRef.noSender()); + actorRef.tell(updateRemoteBucketsMessage((Map>) buckets), ActorRef.noSender()); } void removeRemoteBucket(final Address addr) { - context.parent().tell(removeBucketMessage(addr), ActorRef.noSender()); + actorRef.tell(removeBucketMessage(addr), ActorRef.noSender()); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public > Future getLocalData() { + return (Future) Patterns.ask(actorRef, getLocalDataMessage(), timeout); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public > Future>> getRemoteBuckets() { + return (Future) Patterns.ask(actorRef, getRemoteBucketsMessage(), timeout); } public enum Singletons {