BUG-5280: add ExplicitAsk utility class
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ModuleShardBackendResolver.java
index 359b428c99d5060f59baa45ca2414a34eba46f98..249cd4524d5d1f2ea1b925008c035e850abb8a5e 100644 (file)
@@ -8,8 +8,6 @@
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import akka.actor.ActorRef;
-import akka.japi.Function;
-import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
@@ -27,12 +25,15 @@ import org.opendaylight.controller.cluster.access.client.BackendInfo;
 import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
 import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
 import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
+import org.opendaylight.controller.cluster.common.actor.ExplicitAsk;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Function1;
 import scala.compat.java8.FutureConverters;
 
 /**
@@ -57,6 +58,7 @@ final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendI
     private final ActorContext actorContext;
     // FIXME: this counter should be in superclass somewhere
     private final AtomicLong nextSessionId = new AtomicLong();
+    private final Function1<ActorRef, ?> connectFunction;
 
     @GuardedBy("this")
     private long nextShard = 1;
@@ -64,8 +66,10 @@ final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendI
     private volatile BiMap<String, Long> shards = ImmutableBiMap.of(DefaultShardStrategy.DEFAULT_SHARD, 0L);
 
     // FIXME: we really need just ActorContext.findPrimaryShardAsync()
-    ModuleShardBackendResolver(final ActorContext actorContext) {
+    ModuleShardBackendResolver(final ClientIdentifier clientId, final ActorContext actorContext) {
         this.actorContext = Preconditions.checkNotNull(actorContext);
+        this.connectFunction = ExplicitAsk.toScala(t -> new ConnectClientRequest(clientId, t, ABIVersion.BORON,
+            ABIVersion.current()));
     }
 
     @Override
@@ -112,9 +116,7 @@ final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendI
 
         FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).thenCompose(info -> {
             LOG.debug("Looking up primary info for {} from {}", shardName, info);
-            return FutureConverters.toJava(Patterns.ask(info.getPrimaryShardActor(),
-                (Function<ActorRef, Object>) replyTo -> new ConnectClientRequest(null, replyTo,
-                    ABIVersion.BORON, ABIVersion.current()), DEAD_TIMEOUT));
+            return FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, DEAD_TIMEOUT));
         }).thenApply(response -> {
             if (response instanceof RequestFailure) {
                 final RequestFailure<?, ?> failure = (RequestFailure<?, ?>) response;