package org.opendaylight.controller.cluster.datastore.admin;
import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
import akka.actor.Status.Success;
import akka.dispatch.OnComplete;
import akka.pattern.Patterns;
import akka.util.Timeout;
-import com.google.common.base.Function;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.AbstractMap.SimpleEntry;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.SerializationUtils;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
+import org.opendaylight.controller.cluster.datastore.messages.GetKnownClients;
+import org.opendaylight.controller.cluster.datastore.messages.GetKnownClientsReply;
import org.opendaylight.controller.cluster.datastore.messages.GetShardRole;
import org.opendaylight.controller.cluster.datastore.messages.GetShardRoleReply;
import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshotList;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetKnownClientsForAllShardsInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetKnownClientsForAllShardsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetKnownClientsForAllShardsOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.LocateShardInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.LocateShardOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.LocateShardOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.get.known.clients._for.all.shards.output.ShardResult1Builder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.get.known.clients._for.all.shards.output.shard.result.KnownClients;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.get.known.clients._for.all.shards.output.shard.result.KnownClientsBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.locate.shard.output.member.node.LeaderActorRefBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.locate.shard.output.member.node.LocalBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingState;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultKey;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.Empty;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.common.Uint32;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
/**
* Implements the yang RPCs defined in the generated ClusterAdminService interface.
private static final Timeout SHARD_MGR_TIMEOUT = new Timeout(1, TimeUnit.MINUTES);
private static final Logger LOG = LoggerFactory.getLogger(ClusterAdminRpcService.class);
+ private static final @NonNull RpcResult<LocateShardOutput> LOCAL_SHARD_RESULT =
+ RpcResultBuilder.success(new LocateShardOutputBuilder()
+ .setMemberNode(new LocalBuilder().setLocal(Empty.getInstance()).build())
+ .build())
+ .build();
private final DistributedDataStoreInterface configDataStore;
private final DistributedDataStoreInterface operDataStore;
this.serializer = serializer;
this.makeLeaderLocalTimeout =
- new Timeout(configDataStore.getActorContext().getDatastoreContext()
+ new Timeout(configDataStore.getActorUtils().getDatastoreContext()
.getShardLeaderElectionTimeout().duration().$times(2));
}
return returnFuture;
}
+ @Override
+ public ListenableFuture<RpcResult<LocateShardOutput>> locateShard(final LocateShardInput input) {
+ final ActorUtils utils;
+ switch (input.getDataStoreType()) {
+ case Config:
+ utils = configDataStore.getActorUtils();
+ break;
+ case Operational:
+ utils = operDataStore.getActorUtils();
+ break;
+ default:
+ return newFailedRpcResultFuture("Unhandled datastore in " + input);
+ }
+
+ final SettableFuture<RpcResult<LocateShardOutput>> ret = SettableFuture.create();
+ utils.findPrimaryShardAsync(input.getShardName()).onComplete(new OnComplete<PrimaryShardInfo>() {
+ @Override
+ public void onComplete(final Throwable failure, final PrimaryShardInfo success) throws Throwable {
+ if (failure != null) {
+ LOG.debug("Failed to find shard for {}", input, failure);
+ ret.setException(failure);
+ return;
+ }
+
+ // Data tree implies local leak
+ if (success.getLocalShardDataTree().isPresent()) {
+ ret.set(LOCAL_SHARD_RESULT);
+ return;
+ }
+
+ final ActorSelection actorPath = success.getPrimaryShardActor();
+ ret.set(newSuccessfulResult(new LocateShardOutputBuilder()
+ .setMemberNode(new LeaderActorRefBuilder()
+ .setLeaderActorRef(actorPath.toSerializationFormat())
+ .build())
+ .build()));
+ }
+ }, utils.getClientDispatcher());
+
+ return ret;
+ }
+
@Override
public ListenableFuture<RpcResult<MakeLeaderLocalOutput>> makeLeaderLocal(final MakeLeaderLocalInput input) {
final String shardName = input.getShardName();
return newFailedRpcResultFuture("A valid DataStoreType must be specified");
}
- ActorContext actorContext = dataStoreType == DataStoreType.Config
- ? configDataStore.getActorContext()
- : operDataStore.getActorContext();
+ ActorUtils actorUtils = dataStoreType == DataStoreType.Config
+ ? configDataStore.getActorUtils() : operDataStore.getActorUtils();
LOG.info("Moving leader to local node {} for shard {}, datastoreType {}",
- actorContext.getCurrentMemberName().getName(), shardName, dataStoreType);
+ actorUtils.getCurrentMemberName().getName(), shardName, dataStoreType);
- final scala.concurrent.Future<ActorRef> localShardReply =
- actorContext.findLocalShardAsync(shardName);
+ final Future<ActorRef> localShardReply = actorUtils.findLocalShardAsync(shardName);
final scala.concurrent.Promise<Object> makeLeaderLocalAsk = akka.dispatch.Futures.promise();
localShardReply.onComplete(new OnComplete<ActorRef>() {
@Override
- public void onComplete(final Throwable failure, final ActorRef actorRef) throws Throwable {
+ public void onComplete(final Throwable failure, final ActorRef actorRef) {
if (failure != null) {
LOG.warn("No local shard found for {} datastoreType {} - Cannot request leadership transfer to"
- + " local shard.", shardName, failure);
+ + " local shard.", shardName, dataStoreType, failure);
makeLeaderLocalAsk.failure(failure);
} else {
makeLeaderLocalAsk
- .completeWith(actorContext
+ .completeWith(actorUtils
.executeOperationAsync(actorRef, MakeLeaderLocal.INSTANCE, makeLeaderLocalTimeout));
}
}
- }, actorContext.getClientDispatcher());
+ }, actorUtils.getClientDispatcher());
final SettableFuture<RpcResult<MakeLeaderLocalOutput>> future = SettableFuture.create();
- makeLeaderLocalAsk.future().onComplete(new OnComplete<Object>() {
+ makeLeaderLocalAsk.future().onComplete(new OnComplete<>() {
@Override
- public void onComplete(final Throwable failure, final Object success) throws Throwable {
+ public void onComplete(final Throwable failure, final Object success) {
if (failure != null) {
LOG.error("Leadership transfer failed for shard {}.", shardName, failure);
future.set(RpcResultBuilder.<MakeLeaderLocalOutput>failed().withError(ErrorType.APPLICATION,
LOG.debug("Leadership transfer complete");
future.set(RpcResultBuilder.success(new MakeLeaderLocalOutputBuilder().build()).build());
}
- }, actorContext.getClientDispatcher());
+ }, actorUtils.getClientDispatcher());
return future;
}
LOG.info("Adding replicas for all shards");
final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData = new ArrayList<>();
- Function<String, Object> messageSupplier = AddShardReplica::new;
- sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, messageSupplier);
- sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, messageSupplier);
+ sendMessageToManagerForConfiguredShards(DataStoreType.Config, shardResultData, AddShardReplica::new);
+ sendMessageToManagerForConfiguredShards(DataStoreType.Operational, shardResultData, AddShardReplica::new);
return waitForShardResults(shardResultData, shardResults ->
new AddReplicasForAllShardsOutputBuilder().setShardResult(shardResults).build(),
return newFailedRpcResultFuture("A valid file path must be specified");
}
+ final Uint32 timeout = input.getTimeout();
+ final Timeout opTimeout = timeout != null ? Timeout.apply(timeout.longValue(), TimeUnit.SECONDS)
+ : SHARD_MGR_TIMEOUT;
+
final SettableFuture<RpcResult<BackupDatastoreOutput>> returnFuture = SettableFuture.create();
- ListenableFuture<List<DatastoreSnapshot>> future = sendMessageToShardManagers(GetSnapshot.INSTANCE);
- Futures.addCallback(future, new FutureCallback<List<DatastoreSnapshot>>() {
+ ListenableFuture<List<DatastoreSnapshot>> future = sendMessageToShardManagers(new GetSnapshot(opTimeout));
+ Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(final List<DatastoreSnapshot> snapshots) {
saveSnapshotsToFile(new DatastoreSnapshotList(snapshots), input.getFilePath(), returnFuture);
return returnFuture;
}
+
+ @Override
+ public ListenableFuture<RpcResult<GetKnownClientsForAllShardsOutput>> getKnownClientsForAllShards(
+ final GetKnownClientsForAllShardsInput input) {
+ final ImmutableMap<ShardIdentifier, ListenableFuture<GetKnownClientsReply>> allShardReplies =
+ getAllShardLeadersClients();
+ return Futures.whenAllComplete(allShardReplies.values()).call(() -> processReplies(allShardReplies),
+ MoreExecutors.directExecutor());
+ }
+
+ private static RpcResult<GetKnownClientsForAllShardsOutput> processReplies(
+ final ImmutableMap<ShardIdentifier, ListenableFuture<GetKnownClientsReply>> allShardReplies) {
+ final Map<ShardResultKey, ShardResult> result = Maps.newHashMapWithExpectedSize(allShardReplies.size());
+ for (Entry<ShardIdentifier, ListenableFuture<GetKnownClientsReply>> entry : allShardReplies.entrySet()) {
+ final ListenableFuture<GetKnownClientsReply> future = entry.getValue();
+ final ShardResultBuilder builder = new ShardResultBuilder()
+ .setDataStoreType(entry.getKey().getDataStoreType())
+ .setShardName(entry.getKey().getShardName());
+
+ final GetKnownClientsReply reply;
+ try {
+ reply = Futures.getDone(future);
+ } catch (ExecutionException e) {
+ LOG.debug("Shard {} failed to answer", entry.getKey(), e);
+ final ShardResult sr = builder
+ .setSucceeded(Boolean.FALSE)
+ .setErrorMessage(e.getCause().getMessage())
+ .build();
+ result.put(sr.key(), sr);
+ continue;
+ }
+
+ final ShardResult sr = builder
+ .setSucceeded(Boolean.TRUE)
+ .addAugmentation(new ShardResult1Builder()
+ .setKnownClients(reply.getClients().stream()
+ .map(client -> new KnownClientsBuilder()
+ .setMember(client.getFrontendId().getMemberName().toYang())
+ .setType(client.getFrontendId().getClientType().toYang())
+ .setGeneration(client.getYangGeneration())
+ .build())
+ .collect(Collectors.toMap(KnownClients::key, Function.identity())))
+ .build())
+ .build();
+
+ result.put(sr.key(), sr);
+ }
+
+ return RpcResultBuilder.success(new GetKnownClientsForAllShardsOutputBuilder().setShardResult(result).build())
+ .build();
+ }
+
private static ChangeShardMembersVotingStatus toChangeShardMembersVotingStatus(final String shardName,
final List<MemberVotingState> memberVotingStatus) {
Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
private static <T> SettableFuture<RpcResult<T>> waitForShardResults(
final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData,
- final Function<List<ShardResult>, T> resultDataSupplier,
+ final Function<Map<ShardResultKey, ShardResult>, T> resultDataSupplier,
final String failureLogMsgPrefix) {
final SettableFuture<RpcResult<T>> returnFuture = SettableFuture.create();
- final List<ShardResult> shardResults = new ArrayList<>();
+ final Map<ShardResultKey, ShardResult> shardResults = new HashMap<>();
for (final Entry<ListenableFuture<Success>, ShardResultBuilder> entry : shardResultData) {
Futures.addCallback(entry.getKey(), new FutureCallback<Success>() {
@Override
public void onSuccess(final Success result) {
synchronized (shardResults) {
- ShardResultBuilder shardResult = entry.getValue();
- LOG.debug("onSuccess for shard {}, type {}", shardResult.getShardName(),
- shardResult.getDataStoreType());
- shardResults.add(shardResult.setSucceeded(true).build());
+ final ShardResultBuilder builder = entry.getValue();
+ LOG.debug("onSuccess for shard {}, type {}", builder.getShardName(),
+ builder.getDataStoreType());
+ final ShardResult sr = builder.setSucceeded(Boolean.TRUE).build();
+ shardResults.put(sr.key(), sr);
checkIfComplete();
}
}
@Override
public void onFailure(final Throwable failure) {
synchronized (shardResults) {
- ShardResultBuilder shardResult = entry.getValue();
- LOG.warn("{} for shard {}, type {}", failureLogMsgPrefix, shardResult.getShardName(),
- shardResult.getDataStoreType(), failure);
- shardResults.add(shardResult.setSucceeded(false).setErrorMessage(
- Throwables.getRootCause(failure).getMessage()).build());
+ ShardResultBuilder builder = entry.getValue();
+ LOG.warn("{} for shard {}, type {}", failureLogMsgPrefix, builder.getShardName(),
+ builder.getDataStoreType(), failure);
+ final ShardResult sr = builder
+ .setSucceeded(Boolean.FALSE)
+ .setErrorMessage(Throwables.getRootCause(failure).getMessage())
+ .build();
+ shardResults.put(sr.key(), sr);
checkIfComplete();
}
}
private <T> void sendMessageToManagerForConfiguredShards(final DataStoreType dataStoreType,
final List<Entry<ListenableFuture<T>, ShardResultBuilder>> shardResultData,
final Function<String, Object> messageSupplier) {
- ActorContext actorContext = dataStoreType == DataStoreType.Config ? configDataStore.getActorContext()
- : operDataStore.getActorContext();
- Set<String> allShardNames = actorContext.getConfiguration().getAllShardNames();
+ ActorUtils actorUtils = dataStoreType == DataStoreType.Config ? configDataStore.getActorUtils()
+ : operDataStore.getActorUtils();
+ Set<String> allShardNames = actorUtils.getConfiguration().getAllShardNames();
- LOG.debug("Sending message to all shards {} for data store {}", allShardNames, actorContext.getDataStoreName());
+ LOG.debug("Sending message to all shards {} for data store {}", allShardNames, actorUtils.getDataStoreName());
for (String shardName: allShardNames) {
- ListenableFuture<T> future = this.ask(actorContext.getShardManager(), messageSupplier.apply(shardName),
+ ListenableFuture<T> future = this.ask(actorUtils.getShardManager(), messageSupplier.apply(shardName),
SHARD_MGR_TIMEOUT);
shardResultData.add(new SimpleEntry<>(future,
new ShardResultBuilder().setShardName(shardName).setDataStoreType(dataStoreType)));
private <T> ListenableFuture<List<T>> sendMessageToShardManagers(final Object message) {
Timeout timeout = SHARD_MGR_TIMEOUT;
- ListenableFuture<T> configFuture = ask(configDataStore.getActorContext().getShardManager(), message, timeout);
- ListenableFuture<T> operFuture = ask(operDataStore.getActorContext().getShardManager(), message, timeout);
+ ListenableFuture<T> configFuture = ask(configDataStore.getActorUtils().getShardManager(), message, timeout);
+ ListenableFuture<T> operFuture = ask(operDataStore.getActorUtils().getShardManager(), message, timeout);
return Futures.allAsList(configFuture, operFuture);
}
private <T> ListenableFuture<T> sendMessageToShardManager(final DataStoreType dataStoreType, final Object message) {
ActorRef shardManager = dataStoreType == DataStoreType.Config
- ? configDataStore.getActorContext().getShardManager()
- : operDataStore.getActorContext().getShardManager();
+ ? configDataStore.getActorUtils().getShardManager()
+ : operDataStore.getActorUtils().getShardManager();
return ask(shardManager, message, SHARD_MGR_TIMEOUT);
}
+ @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+ justification = "https://github.com/spotbugs/spotbugs/issues/811")
@SuppressWarnings("checkstyle:IllegalCatch")
private static void saveSnapshotsToFile(final DatastoreSnapshotList snapshots, final String fileName,
final SettableFuture<RpcResult<BackupDatastoreOutput>> returnFuture) {
onMessageFailure(String.format("Failed to back up datastore to file %s", fileName), returnFuture, failure);
}
+ @SuppressFBWarnings("SLF4J_SIGN_ONLY_FORMAT")
private static <T> void onMessageFailure(final String msg, final SettableFuture<RpcResult<T>> returnFuture,
final Throwable failure) {
- LOG.error(msg, failure);
+ LOG.error("{}", msg, failure);
returnFuture.set(ClusterAdminRpcService.<T>newFailedRpcResultBuilder(String.format("%s: %s", msg,
failure.getMessage())).build());
}
final SettableFuture<T> returnFuture = SettableFuture.create();
@SuppressWarnings("unchecked")
- scala.concurrent.Future<T> askFuture = (scala.concurrent.Future<T>) Patterns.ask(actor, message, timeout);
+ Future<T> askFuture = (Future<T>) Patterns.ask(actor, message, timeout);
askFuture.onComplete(new OnComplete<T>() {
@Override
public void onComplete(final Throwable failure, final T resp) {
returnFuture.set(resp);
}
}
- }, configDataStore.getActorContext().getClientDispatcher());
+ }, configDataStore.getActorUtils().getClientDispatcher());
return returnFuture;
}
+ private ImmutableMap<ShardIdentifier, ListenableFuture<GetKnownClientsReply>> getAllShardLeadersClients() {
+ final ImmutableMap.Builder<ShardIdentifier, ListenableFuture<GetKnownClientsReply>> builder =
+ ImmutableMap.builder();
+
+ addAllShardsClients(builder, DataStoreType.Config, configDataStore.getActorUtils());
+ addAllShardsClients(builder, DataStoreType.Operational, operDataStore.getActorUtils());
+
+ return builder.build();
+ }
+
+ private static void addAllShardsClients(
+ final ImmutableMap.Builder<ShardIdentifier, ListenableFuture<GetKnownClientsReply>> builder,
+ final DataStoreType type, final ActorUtils utils) {
+ for (String shardName : utils.getConfiguration().getAllShardNames()) {
+ final SettableFuture<GetKnownClientsReply> future = SettableFuture.create();
+ builder.put(new ShardIdentifier(type, shardName), future);
+
+ utils.findPrimaryShardAsync(shardName).flatMap(
+ info -> Patterns.ask(info.getPrimaryShardActor(), GetKnownClients.INSTANCE, SHARD_MGR_TIMEOUT),
+ utils.getClientDispatcher()).onComplete(new OnComplete<>() {
+ @Override
+ public void onComplete(final Throwable failure, final Object success) {
+ if (failure == null) {
+ future.set((GetKnownClientsReply) success);
+ } else {
+ future.setException(failure);
+ }
+ }
+ }, utils.getClientDispatcher());
+ }
+ }
+
private static <T> ListenableFuture<RpcResult<T>> newFailedRpcResultFuture(final String message) {
return ClusterAdminRpcService.<T>newFailedRpcResultBuilder(message).buildFuture();
}