+ final boolean isSingleShard = txFutureCallbackMap.size() == 1;
+ return isSingleShard ? createSingleCommitCohort() : createMultiCommitCohort();
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private AbstractThreePhaseCommitCohort<Object> createSingleCommitCohort() {
+ TransactionFutureCallback txFutureCallback = txFutureCallbackMap.values().iterator().next();
+
+ LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(),
+ txFutureCallback.getShardName(), getTransactionChainId());
+
+ final OperationCallback.Reference operationCallbackRef =
+ new OperationCallback.Reference(OperationCallback.NO_OP_CALLBACK);
+ final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+ final Future future;
+ if (transactionContext != null) {
+ // avoid the creation of a promise and a TransactionOperation
+ future = getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef);
+ } else {
+ final Promise promise = akka.dispatch.Futures.promise();
+ txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+ @Override
+ public void invoke(TransactionContext transactionContext) {
+ promise.completeWith(getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef));
+ }
+ });
+ future = promise.future();
+ }
+
+ return new SingleCommitCohortProxy(actorContext, future, getIdentifier().toString(), operationCallbackRef);
+ }
+
+ private Future<?> getReadyOrDirectCommitFuture(TransactionContext transactionContext,
+ OperationCallback.Reference operationCallbackRef) {
+ if(transactionContext.supportsDirectCommit()) {
+ TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback(actorContext);
+ operationCallbackRef.set(rateLimitingCallback);
+ rateLimitingCallback.run();
+ return transactionContext.directCommit();
+ } else {
+ return transactionContext.readyTransaction();
+ }
+ }
+
+ private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort() {