package org.opendaylight.controller.cluster.databroker.actors.dds;
import akka.actor.ActorRef;
-import akka.japi.Function;
-import akka.pattern.Patterns;
import akka.util.Timeout;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import org.opendaylight.controller.cluster.access.client.BackendInfoResolver;
import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
+import org.opendaylight.controller.cluster.common.actor.ExplicitAsk;
import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.Function1;
import scala.compat.java8.FutureConverters;
/**
private final ActorContext actorContext;
// FIXME: this counter should be in superclass somewhere
private final AtomicLong nextSessionId = new AtomicLong();
+ private final Function1<ActorRef, ?> connectFunction;
@GuardedBy("this")
private long nextShard = 1;
private volatile BiMap<String, Long> shards = ImmutableBiMap.of(DefaultShardStrategy.DEFAULT_SHARD, 0L);
// FIXME: we really need just ActorContext.findPrimaryShardAsync()
- ModuleShardBackendResolver(final ActorContext actorContext) {
+ ModuleShardBackendResolver(final ClientIdentifier clientId, final ActorContext actorContext) {
this.actorContext = Preconditions.checkNotNull(actorContext);
+ this.connectFunction = ExplicitAsk.toScala(t -> new ConnectClientRequest(clientId, t, ABIVersion.BORON,
+ ABIVersion.current()));
}
@Override
FutureConverters.toJava(actorContext.findPrimaryShardAsync(shardName)).thenCompose(info -> {
LOG.debug("Looking up primary info for {} from {}", shardName, info);
- return FutureConverters.toJava(Patterns.ask(info.getPrimaryShardActor(),
- (Function<ActorRef, Object>) replyTo -> new ConnectClientRequest(null, replyTo,
- ABIVersion.BORON, ABIVersion.current()), DEAD_TIMEOUT));
+ return FutureConverters.toJava(ExplicitAsk.ask(info.getPrimaryShardActor(), connectFunction, DEAD_TIMEOUT));
}).thenApply(response -> {
if (response instanceof RequestFailure) {
final RequestFailure<?, ?> failure = (RequestFailure<?, ?>) response;