- /**
- * Interfaces for transaction operations to be invoked later.
- */
- private static interface TransactionOperation {
- void invoke(TransactionContext transactionContext);
- }
-
- /**
- * Implements a Future OnComplete callback for a CreateTransaction message. This class handles
- * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a
- * retry task after a short delay.
- * <p>
- * The end result from a completed CreateTransaction message is a TransactionContext that is
- * used to perform transaction operations. Transaction operations that occur before the
- * CreateTransaction completes are cache and executed once the CreateTransaction completes,
- * successfully or not.
- */
- private class TransactionFutureCallback extends OnComplete<Object> {
-
- /**
- * The list of transaction operations to execute once the CreateTransaction completes.
- */
- @GuardedBy("txOperationsOnComplete")
- private final List<TransactionOperation> txOperationsOnComplete = Lists.newArrayList();
-
- /**
- * The TransactionContext resulting from the CreateTransaction reply.
- */
- private volatile TransactionContext transactionContext;
-
- /**
- * The target primary shard.
- */
- private volatile ActorSelection primaryShard;
-
- private volatile int createTxTries = (int) (actorContext.getDatastoreContext().
- getShardLeaderElectionTimeout().duration().toMillis() /
- CREATE_TX_TRY_INTERVAL.toMillis());
-
- private final String shardName;
-
- TransactionFutureCallback(String shardName) {
- this.shardName = shardName;
- }
-
- String getShardName() {
- return shardName;
- }
-
- TransactionContext getTransactionContext() {
- return transactionContext;
- }
-
-
- /**
- * Sets the target primary shard and initiates a CreateTransaction try.
- */
- void setPrimaryShard(ActorSelection primaryShard) {
- LOG.debug("Tx {} Primary shard found - trying create transaction", identifier);
-
- this.primaryShard = primaryShard;
- tryCreateTransaction();
- }
-
- /**
- * Adds a TransactionOperation to be executed after the CreateTransaction completes.
- */
- void addTxOperationOnComplete(TransactionOperation operation) {
- boolean invokeOperation = true;
- synchronized(txOperationsOnComplete) {
- if(transactionContext == null) {
- LOG.debug("Tx {} Adding operation on complete {}", identifier);
-
- invokeOperation = false;
- txOperationsOnComplete.add(operation);
- }
- }
-
- if(invokeOperation) {
- operation.invoke(transactionContext);
- }
- }
-
- void enqueueTransactionOperation(final TransactionOperation op) {
-
- if (transactionContext != null) {
- op.invoke(transactionContext);
- } else {
- // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
- // callback to be executed after the Tx is created.
- addTxOperationOnComplete(op);
- }
- }
-
- /**
- * Performs a CreateTransaction try async.
- */
- private void tryCreateTransaction() {
- Future<Object> createTxFuture = sendCreateTransaction(primaryShard,
- new CreateTransaction(identifier.toString(),
- TransactionProxy.this.transactionType.ordinal(),
- getTransactionChainId()).toSerializable());
-
- createTxFuture.onComplete(this, actorContext.getClientDispatcher());
- }