- public String getTransactionChainId() {
- return transactionChainId;
- }
-
- protected ActorContext getActorContext() {
- return actorContext;
- }
-
- /**
- * Implements a Future OnComplete callback for a CreateTransaction message. This class handles
- * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a
- * retry task after a short delay.
- * <p>
- * The end result from a completed CreateTransaction message is a TransactionContext that is
- * used to perform transaction operations. Transaction operations that occur before the
- * CreateTransaction completes are cache and executed once the CreateTransaction completes,
- * successfully or not.
- */
- private class TransactionFutureCallback extends OnComplete<Object> {
-
- /**
- * The list of transaction operations to execute once the CreateTransaction completes.
- */
- @GuardedBy("txOperationsOnComplete")
- private final List<TransactionOperation> txOperationsOnComplete = Lists.newArrayList();
-
- /**
- * The TransactionContext resulting from the CreateTransaction reply.
- */
- private volatile TransactionContext transactionContext;
-
- /**
- * The target primary shard.
- */
- private volatile ActorSelection primaryShard;
-
- private volatile int createTxTries = (int) (actorContext.getDatastoreContext().
- getShardLeaderElectionTimeout().duration().toMillis() /
- CREATE_TX_TRY_INTERVAL.toMillis());
-
- private final String shardName;
-
- TransactionFutureCallback(String shardName) {
- this.shardName = shardName;
- }
-
- String getShardName() {
- return shardName;
- }
-
- TransactionContext getTransactionContext() {
- return transactionContext;
- }
-
-
- /**
- * Sets the target primary shard and initiates a CreateTransaction try.
- */
- void setPrimaryShard(ActorSelection primaryShard) {
- this.primaryShard = primaryShard;
-
- if(transactionType == TransactionType.WRITE_ONLY &&
- actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
- LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
- getIdentifier(), primaryShard);
-
- // For write-only Tx's we prepare the transaction modifications directly on the shard actor
- // to avoid the overhead of creating a separate transaction actor.
- // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow.
- executeTxOperatonsOnComplete(createValidTransactionContext(this.primaryShard,
- this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION));
- } else {
- tryCreateTransaction();
- }
+ private Future<?> getReadyOrDirectCommitFuture(TransactionContext transactionContext,
+ OperationCallback.Reference operationCallbackRef) {
+ if (transactionContext.supportsDirectCommit()) {
+ TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback(
+ txContextFactory.getActorContext());
+ operationCallbackRef.set(rateLimitingCallback);
+ rateLimitingCallback.run();
+ return transactionContext.directCommit();
+ } else {
+ return transactionContext.readyTransaction();