+ /**
+ * Implements a Future OnComplete callback for a CreateTransaction message. This class handles
+ * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a
+ * retry task after a short delay.
+ * <p>
+ * The end result from a completed CreateTransaction message is a TransactionContext that is
+ * used to perform transaction operations. Transaction operations that occur before the
+ * CreateTransaction completes are cache and executed once the CreateTransaction completes,
+ * successfully or not.
+ */
+ private class TransactionFutureCallback extends OnComplete<Object> {
+
+ /**
+ * The list of transaction operations to execute once the CreateTransaction completes.
+ */
+ @GuardedBy("txOperationsOnComplete")
+ private final List<TransactionOperation> txOperationsOnComplete = Lists.newArrayList();
+
+ /**
+ * The TransactionContext resulting from the CreateTransaction reply.
+ */
+ private volatile TransactionContext transactionContext;
+
+ /**
+ * The target primary shard.
+ */
+ private volatile ActorSelection primaryShard;
+
+ private volatile int createTxTries = (int) (actorContext.getDatastoreContext().
+ getShardLeaderElectionTimeout().duration().toMillis() /
+ CREATE_TX_TRY_INTERVAL.toMillis());
+
+ private final String shardName;
+
+ TransactionFutureCallback(String shardName) {
+ this.shardName = shardName;
+ }
+
+ String getShardName() {
+ return shardName;
+ }
+
+ TransactionContext getTransactionContext() {
+ return transactionContext;
+ }
+
+
+ /**
+ * Sets the target primary shard and initiates a CreateTransaction try.
+ */
+ void setPrimaryShard(ActorSelection primaryShard) {
+ LOG.debug("Tx {} Primary shard found - trying create transaction", identifier);
+
+ this.primaryShard = primaryShard;
+ tryCreateTransaction();
+ }
+
+ /**
+ * Adds a TransactionOperation to be executed after the CreateTransaction completes.
+ */
+ void addTxOperationOnComplete(TransactionOperation operation) {
+ synchronized(txOperationsOnComplete) {
+ if(transactionContext == null) {
+ LOG.debug("Tx {} Adding operation on complete {}", identifier);
+
+ txOperationsOnComplete.add(operation);
+ } else {
+ operation.invoke(transactionContext);
+ }
+ }
+ }
+
+
+ <T> Future<T> enqueueFutureOperation(final FutureOperation<T> op) {
+
+ Future<T> future;
+
+ if (transactionContext != null) {
+ future = op.invoke(transactionContext);
+ } else {
+ // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
+ // callback to be executed after the Tx is created.
+ final Promise<T> promise = akka.dispatch.Futures.promise();
+ addTxOperationOnComplete(new TransactionOperation() {
+ @Override
+ public void invoke(TransactionContext transactionContext) {
+ promise.completeWith(op.invoke(transactionContext));
+ }
+ });
+
+ future = promise.future();
+ }
+
+ return future;
+ }
+
+ <T> CheckedFuture<T, ReadFailedException> enqueueReadOperation(final ReadOperation<T> op) {
+
+ CheckedFuture<T, ReadFailedException> future;
+
+ if (transactionContext != null) {
+ future = op.invoke(transactionContext);
+ } else {
+ // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
+ // callback to be executed after the Tx is created.
+ final SettableFuture<T> proxyFuture = SettableFuture.create();
+ addTxOperationOnComplete(new TransactionOperation() {
+ @Override
+ public void invoke(TransactionContext transactionContext) {
+ Futures.addCallback(op.invoke(transactionContext), new FutureCallback<T>() {
+ @Override
+ public void onSuccess(T data) {
+ proxyFuture.set(data);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ proxyFuture.setException(t);
+ }
+ });
+ }
+ });
+
+ future = MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
+ }
+
+ return future;
+ }
+
+ void enqueueModifyOperation(final TransactionOperation op) {
+
+ if (transactionContext != null) {
+ op.invoke(transactionContext);