- private Future<Iterable<Object>> invokeCohorts(Object message) {
- List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohorts.size());
- for(ActorSelection cohort : cohorts) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, cohort);
- }
- futureList.add(actorContext.executeOperationAsync(cohort, message, actorContext.getTransactionCommitOperationTimeout()));
+ private void sendCanCommitTransaction(final CohortInfo toCohortInfo, final OnComplete<Object> onComplete) {
+ CanCommitTransaction message = new CanCommitTransaction(transactionId, toCohortInfo.getActorVersion());
+
+ LOG.debug("Tx {}: sending {} to {}", transactionId, message, toCohortInfo.getResolvedActor());
+
+ Future<Object> future = actorUtils.executeOperationAsync(toCohortInfo.getResolvedActor(),
+ message.toSerializable(), actorUtils.getTransactionCommitOperationTimeout());
+ future.onComplete(onComplete, actorUtils.getClientDispatcher());
+ }
+
+ private Future<Iterable<Object>> invokeCohorts(final MessageSupplier messageSupplier) {
+ List<Future<Object>> futureList = new ArrayList<>(cohorts.size());
+ for (CohortInfo cohort : cohorts) {
+ Object message = messageSupplier.newMessage(transactionId, cohort.getActorVersion());
+
+ LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message , cohort.getResolvedActor());
+
+ futureList.add(actorUtils.executeOperationAsync(cohort.getResolvedActor(), message,
+ actorUtils.getTransactionCommitOperationTimeout()));