- } else {
- return false;
- }
- }
-
- @Override
- public AbstractThreePhaseCommitCohort ready() {
- Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
- "Read-only transactions cannot be readied");
-
- final boolean success = seal(TransactionState.READY);
- Preconditions.checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state);
-
- LOG.debug("Tx {} Readying {} transactions for commit", getIdentifier(),
- txFutureCallbackMap.size());
-
- if (txFutureCallbackMap.isEmpty()) {
- TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
- return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
- }
-
- throttleOperation(txFutureCallbackMap.size());
-
- List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txFutureCallbackMap.size());
- for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
-
- LOG.debug("Tx {} Readying transaction for shard {} chain {}", getIdentifier(),
- txFutureCallback.getShardName(), transactionChainId);
-
- final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
- final Future<ActorSelection> future;
- if (transactionContext != null) {
- // avoid the creation of a promise and a TransactionOperation
- future = transactionContext.readyTransaction();
- } else {
- final Promise<ActorSelection> promise = akka.dispatch.Futures.promise();
- txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
- @Override
- public void invoke(TransactionContext transactionContext) {
- promise.completeWith(transactionContext.readyTransaction());
- }
- });
- future = promise.future();
- }
-
- cohortFutures.add(future);