- private Future<Iterable<Object>> sendMesageToSuccessful(final Object message) {
- return Futures.traverse(successfulFromPrevious, cohortResponse -> Patterns.ask(
- cohortResponse.getCohort(), message, timeout), ExecutionContexts.global());
+ private List<Entry<ActorRef, Future<Object>>> sendMessageToSuccessful(final Object message) {
+ LOG.debug("{}: sendMesageToSuccessful: {}", txId, message);
+
+ final List<Entry<ActorRef, Future<Object>>> ret = new ArrayList<>(successfulFromPrevious.size());
+ for (Success s : successfulFromPrevious) {
+ final ActorRef actor = s.getCohort();
+ ret.add(new SimpleImmutableEntry<>(actor, Patterns.ask(actor, message, timeout)));
+ }
+ return ret;
+ }
+
+ @Nonnull
+ private CompletionStage<Void> processResponses(final List<Entry<ActorRef, Future<Object>>> futures,
+ final State currentState, final State afterState) {
+ LOG.debug("{}: processResponses - currentState: {}, afterState: {}", txId, currentState, afterState);
+ final CompletableFuture<Void> returnFuture = new CompletableFuture<>();
+ Future<Iterable<Object>> aggregateFuture = Futures.sequence(Lists.transform(futures, Entry::getValue),
+ ExecutionContexts.global());
+
+ aggregateFuture.onComplete(new OnComplete<Iterable<Object>>() {
+ @Override
+ public void onComplete(final Throwable failure, final Iterable<Object> results) {
+ callbackExecutor.execute(
+ () -> processResponses(failure, results, currentState, afterState, returnFuture));
+ }
+ }, ExecutionContexts.global());
+
+ return returnFuture;