+ private static <T> SettableFuture<RpcResult<T>> waitForShardResults(
+ final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData,
+ final Function<List<ShardResult>, T> resultDataSupplier,
+ final String failureLogMsgPrefix) {
+ final SettableFuture<RpcResult<T>> returnFuture = SettableFuture.create();
+ final List<ShardResult> shardResults = new ArrayList<>();
+ for(final Entry<ListenableFuture<Success>, ShardResultBuilder> entry: shardResultData) {
+ Futures.addCallback(entry.getKey(), new FutureCallback<Success>() {
+ @Override
+ public void onSuccess(Success result) {
+ synchronized(shardResults) {
+ ShardResultBuilder shardResult = entry.getValue();
+ LOG.debug("onSuccess for shard {}, type {}", shardResult.getShardName(),
+ shardResult.getDataStoreType());
+ shardResults.add(shardResult.setSucceeded(true).build());
+ checkIfComplete();
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ synchronized(shardResults) {
+ ShardResultBuilder shardResult = entry.getValue();
+ LOG.warn("{} for shard {}, type {}", failureLogMsgPrefix, shardResult.getShardName(),
+ shardResult.getDataStoreType(), t);
+ shardResults.add(shardResult.setSucceeded(false).setErrorMessage(
+ Throwables.getRootCause(t).getMessage()).build());
+ checkIfComplete();
+ }
+ }
+
+ void checkIfComplete() {
+ LOG.debug("checkIfComplete: expected {}, actual {}", shardResultData.size(), shardResults.size());
+ if(shardResults.size() == shardResultData.size()) {
+ returnFuture.set(newSuccessfulResult(resultDataSupplier.apply(shardResults)));
+ }
+ }
+ });
+ }
+ return returnFuture;
+ }
+
+ private <T> void sendMessageToManagerForConfiguredShards(DataStoreType dataStoreType,
+ List<Entry<ListenableFuture<T>, ShardResultBuilder>> shardResultData,
+ Function<String, Object> messageSupplier) {
+ ActorContext actorContext = dataStoreType == DataStoreType.Config ?
+ configDataStore.getActorContext() : operDataStore.getActorContext();
+ Set<String> allShardNames = actorContext.getConfiguration().getAllShardNames();
+
+ LOG.debug("Sending message to all shards {} for data store {}", allShardNames, actorContext.getDataStoreType());
+
+ for(String shardName: allShardNames) {
+ ListenableFuture<T> future = this.<T>ask(actorContext.getShardManager(), messageSupplier.apply(shardName),
+ SHARD_MGR_TIMEOUT);
+ shardResultData.add(new SimpleEntry<ListenableFuture<T>, ShardResultBuilder>(future,
+ new ShardResultBuilder().setShardName(shardName).setDataStoreType(dataStoreType)));
+ }
+ }
+