Fix remaining CS warnings in sal-distributed-datastore
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / ModuleShardBackendResolver.java
index 359b428c99d5060f59baa45ca2414a34eba46f98..de115f06638878a343499a9f72d071324e1827e9 100644 (file)
@@ -8,8 +8,6 @@
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import akka.actor.ActorRef;
-import akka.japi.Function;
-import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
@@ -27,12 +25,15 @@ import org.opendaylight.controller.cluster.access.client.BackendInfo;
 import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
 import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
 import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
+import org.opendaylight.controller.cluster.common.actor.ExplicitAsk;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Function1;
 import scala.compat.java8.FutureConverters;
 
 /**
@@ -57,6 +58,7 @@ final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendI
     private final ActorContext actorContext;
     // FIXME: this counter should be in superclass somewhere
     private final AtomicLong nextSessionId = new AtomicLong();
+    private final Function1<ActorRef, ?> connectFunction;
 
     @GuardedBy("this")
     private long nextShard = 1;
@@ -64,8 +66,10 @@ final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendI
     private volatile BiMap<String, Long> shards = ImmutableBiMap.of(DefaultShardStrategy.DEFAULT_SHARD, 0L);
 
     // FIXME: we really need just ActorContext.findPrimaryShardAsync()
-    ModuleShardBackendResolver(final ActorContext actorContext) {
+    ModuleShardBackendResolver(final ClientIdentifier clientId, final ActorContext actorContext) {
         this.actorContext = Preconditions.checkNotNull(actorContext);
+        this.connectFunction = ExplicitAsk.toScala(t -> new ConnectClientRequest(clientId, t, ABIVersion.BORON,
+            ABIVersion.current()));
     }
 
     @Override
@@ -89,10 +93,10 @@ final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendI
                 if (cookie == null) {
                     cookie = nextShard++;
 
-                    Builder<String, Long> b = ImmutableBiMap.builder();
-                    b.putAll(shards);
-                    b.put(shardName, cookie);
-                    shards = b.build();
+                    Builder<String, Long> builder = ImmutableBiMap.builder();
+                    builder.putAll(shards);
+                    builder.put(shardName, cookie);
+                    shards = builder.build();
                 }
             }
         }
@@ -108,13 +112,11 @@ final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendI
             return NULL_FUTURE;
         }
 
-        final CompletableFuture<ShardBackendInfo> ret = new CompletableFuture<ShardBackendInfo>();
+        final CompletableFuture<ShardBackendInfo> ret = new CompletableFuture<>();
 
         FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).thenCompose(info -> {
             LOG.debug("Looking up primary info for {} from {}", shardName, info);
-            return FutureConverters.toJava(Patterns.ask(info.getPrimaryShardActor(),
-                (Function<ActorRef, Object>) replyTo -> new ConnectClientRequest(null, replyTo,
-                    ABIVersion.BORON, ABIVersion.current()), DEAD_TIMEOUT));
+            return FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, DEAD_TIMEOUT));
         }).thenApply(response -> {
             if (response instanceof RequestFailure) {
                 final RequestFailure<?, ?> failure = (RequestFailure<?, ?>) response;
@@ -130,9 +132,9 @@ final class ModuleShardBackendResolver extends BackendInfoResolver<ShardBackendI
             return new ShardBackendInfo(success.getBackend(),
                 nextSessionId.getAndIncrement(), success.getVersion(), shardName, UnsignedLong.fromLongBits(cookie),
                 success.getDataTree(), success.getMaxMessages());
-        }).whenComplete((info, t) -> {
-            if (t != null) {
-                ret.completeExceptionally(t);
+        }).whenComplete((info, throwablw) -> {
+            if (throwablw != null) {
+                ret.completeExceptionally(throwablw);
             } else {
                 ret.complete(info);
             }