+ List<Future<ActorSelection>> cohortFutures = new ArrayList<>(cohorts.size());
+ for (CohortInfo info: cohorts) {
+ cohortFutures.add(info.getActorFuture());
+ }
+
+ return cohortFutures;
+ }
+
+ static class CohortInfo {
+ private final Future<ActorSelection> actorFuture;
+ private final Supplier<Short> actorVersionSupplier;
+
+ private volatile ActorSelection resolvedActor;
+
+ CohortInfo(final Future<ActorSelection> actorFuture, final Supplier<Short> actorVersionSupplier) {
+ this.actorFuture = actorFuture;
+ this.actorVersionSupplier = actorVersionSupplier;
+ }
+
+ Future<ActorSelection> getActorFuture() {
+ return actorFuture;
+ }
+
+ ActorSelection getResolvedActor() {
+ return resolvedActor;
+ }
+
+ void setResolvedActor(final ActorSelection resolvedActor) {
+ this.resolvedActor = resolvedActor;
+ }
+
+ short getActorVersion() {
+ checkState(resolvedActor != null, "getActorVersion cannot be called until the actor is resolved");
+ return actorVersionSupplier.get();
+ }
+ }
+
+ private interface MessageSupplier {
+ Object newMessage(TransactionIdentifier transactionId, short version);
+
+ boolean isSerializedReplyType(Object reply);