- return combinedFutures.transform(new AbstractFunction1<Iterable<ActorSelection>, Void>() {
- @Override
- public Void apply(Iterable<ActorSelection> actorSelections) {
- cohorts = Lists.newArrayList(actorSelections);
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} successfully built cohort path list: {}",
- transactionId, cohorts);
+ final AtomicInteger completed = new AtomicInteger(cohorts.size());
+ final Object lock = new Object();
+ for (final CohortInfo info: cohorts) {
+ info.getActorFuture().onComplete(new OnComplete<ActorSelection>() {
+ @Override
+ public void onComplete(final Throwable failure, final ActorSelection actor) {
+ synchronized (lock) {
+ boolean done = completed.decrementAndGet() == 0;
+ if (failure != null) {
+ LOG.debug("Tx {}: a cohort Future failed", transactionId, failure);
+ cohortsResolvedFuture.setException(failure);
+ } else if (!cohortsResolvedFuture.isDone()) {
+ LOG.debug("Tx {}: cohort actor {} resolved", transactionId, actor);
+
+ info.setResolvedActor(actor);
+ if (done) {
+ LOG.debug("Tx {}: successfully resolved all cohort actors", transactionId);
+ cohortsResolvedFuture.set(null);
+ }
+ }
+ }