- @Override
- public Future<ActorPath> readyTransaction() {
- LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
- identifier, recordedOperationFutures.size());
-
- // Send the ReadyTransaction message to the Tx actor.
-
- final Future<Object> replyFuture = actorContext.executeRemoteOperationAsync(getActor(),
- new ReadyTransaction().toSerializable());
-
- // Combine all the previously recorded put/merge/delete operation reply Futures and the
- // ReadyTransactionReply Future into one Future. If any one fails then the combined
- // Future will fail. We need all prior operations and the ready operation to succeed
- // in order to attempt commit.
-
- List<Future<Object>> futureList =
- Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1);
- futureList.addAll(recordedOperationFutures);
- futureList.add(replyFuture);
-
- Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
- actorContext.getActorSystem().dispatcher());
-
- // Transform the combined Future into a Future that returns the cohort actor path from
- // the ReadyTransactionReply. That's the end result of the ready operation.
-
- return combinedFutures.transform(new AbstractFunction1<Iterable<Object>, ActorPath>() {
- @Override
- public ActorPath apply(Iterable<Object> notUsed) {
-
- LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
- identifier);
-
- // At this point all the Futures succeeded and we need to extract the cohort
- // actor path from the ReadyTransactionReply. For the recorded operations, they
- // don't return any data so we're only interested that they completed
- // successfully. We could be paranoid and verify the correct reply types but
- // that really should never happen so it's not worth the overhead of
- // de-serializing each reply.
+ /**
+ * Adds a TransactionOperation to be executed after the CreateTransaction completes.
+ */
+ void addTxOperationOnComplete(TransactionOperation operation) {
+ boolean invokeOperation = true;
+ synchronized(txOperationsOnComplete) {
+ if(transactionContext == null) {
+ LOG.debug("Tx {} Adding operation on complete", getIdentifier());