- @SuppressWarnings("checkstyle:IllegalCatch")
- private void processResponses(final Future<Iterable<Object>> resultsFuture, final State currentState,
- final State afterState) throws TimeoutException, ExecutionException {
- final Iterable<Object> results;
- try {
- results = Await.result(resultsFuture, timeout.duration());
- } catch (Exception e) {
- successfulFromPrevious = null;
- Throwables.propagateIfInstanceOf(e, TimeoutException.class);
- throw Throwables.propagate(e);
+ private @NonNull CompletionStage<Void> processResponses(final List<Entry<ActorRef, Future<Object>>> futures,
+ final State currentState, final State afterState) {
+ LOG.debug("{}: processResponses - currentState: {}, afterState: {}", txId, currentState, afterState);
+ final CompletableFuture<Void> returnFuture = new CompletableFuture<>();
+ Future<Iterable<Object>> aggregateFuture = Futures.sequence(Lists.transform(futures, Entry::getValue),
+ ExecutionContexts.global());
+
+ aggregateFuture.onComplete(new OnComplete<Iterable<Object>>() {
+ @Override
+ public void onComplete(final Throwable failure, final Iterable<Object> results) {
+ callbackExecutor.execute(
+ () -> processResponses(failure, results, currentState, afterState, returnFuture));
+ }
+ }, ExecutionContexts.global());
+
+ return returnFuture;
+ }
+
+ // FB issues violation for passing null to CompletableFuture#complete but it is valid and necessary when the
+ // generic type is Void.
+ @SuppressFBWarnings(value = { "NP_NONNULL_PARAM_VIOLATION", "UPM_UNCALLED_PRIVATE_METHOD" },
+ justification = "https://github.com/spotbugs/spotbugs/issues/811")
+ private void processResponses(final Throwable failure, final Iterable<Object> results,
+ final State currentState, final State afterState, final CompletableFuture<Void> resultFuture) {
+ if (failure != null) {
+ successfulFromPrevious = Collections.emptyList();
+ resultFuture.completeExceptionally(failure);
+ return;
+ }
+
+ final Collection<Failure> failed = new ArrayList<>(1);
+ final List<Success> successful = new ArrayList<>();
+ for (Object result : results) {
+ if (result instanceof DataTreeCohortActor.Success) {
+ successful.add((Success) result);
+ } else if (result instanceof Status.Failure) {
+ failed.add((Failure) result);
+ } else {
+ LOG.warn("{}: unrecognized response {}, ignoring it", txId, result);
+ }