X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FModuleShardBackendResolver.java;h=de115f06638878a343499a9f72d071324e1827e9;hp=359b428c99d5060f59baa45ca2414a34eba46f98;hb=a510fba141230ce9fe8301f9eb0198cc09df46ca;hpb=9b4f21460c6dcb10c381df631d064d05de16546c diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java index 359b428c99..de115f0663 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ModuleShardBackendResolver.java @@ -8,8 +8,6 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import akka.actor.ActorRef; -import akka.japi.Function; -import akka.pattern.Patterns; import akka.util.Timeout; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; @@ -27,12 +25,15 @@ import org.opendaylight.controller.cluster.access.client.BackendInfo; import org.opendaylight.controller.cluster.access.client.BackendInfoResolver; import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest; import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess; +import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; +import org.opendaylight.controller.cluster.common.actor.ExplicitAsk; import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.Function1; import scala.compat.java8.FutureConverters; /** @@ -57,6 +58,7 @@ final class ModuleShardBackendResolver extends BackendInfoResolver connectFunction; @GuardedBy("this") private long nextShard = 1; @@ -64,8 +66,10 @@ final class ModuleShardBackendResolver extends BackendInfoResolver shards = ImmutableBiMap.of(DefaultShardStrategy.DEFAULT_SHARD, 0L); // FIXME: we really need just ActorContext.findPrimaryShardAsync() - ModuleShardBackendResolver(final ActorContext actorContext) { + ModuleShardBackendResolver(final ClientIdentifier clientId, final ActorContext actorContext) { this.actorContext = Preconditions.checkNotNull(actorContext); + this.connectFunction = ExplicitAsk.toScala(t -> new ConnectClientRequest(clientId, t, ABIVersion.BORON, + ABIVersion.current())); } @Override @@ -89,10 +93,10 @@ final class ModuleShardBackendResolver extends BackendInfoResolver b = ImmutableBiMap.builder(); - b.putAll(shards); - b.put(shardName, cookie); - shards = b.build(); + Builder builder = ImmutableBiMap.builder(); + builder.putAll(shards); + builder.put(shardName, cookie); + shards = builder.build(); } } } @@ -108,13 +112,11 @@ final class ModuleShardBackendResolver extends BackendInfoResolver ret = new CompletableFuture(); + final CompletableFuture ret = new CompletableFuture<>(); FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).thenCompose(info -> { LOG.debug("Looking up primary info for {} from {}", shardName, info); - return FutureConverters.toJava(Patterns.ask(info.getPrimaryShardActor(), - (Function) replyTo -> new ConnectClientRequest(null, replyTo, - ABIVersion.BORON, ABIVersion.current()), DEAD_TIMEOUT)); + return FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, DEAD_TIMEOUT)); }).thenApply(response -> { if (response instanceof RequestFailure) { final RequestFailure failure = (RequestFailure) response; @@ -130,9 +132,9 @@ final class ModuleShardBackendResolver extends BackendInfoResolver { - if (t != null) { - ret.completeExceptionally(t); + }).whenComplete((info, throwablw) -> { + if (throwablw != null) { + ret.completeExceptionally(throwablw); } else { ret.complete(info); }