+ future = promise.future();
+ }
+
+ return future;
+ }
+
+ <T> CheckedFuture<T, ReadFailedException> enqueueReadOperation(final ReadOperation<T> op) {
+
+ CheckedFuture<T, ReadFailedException> future;
+
+ if (transactionContext != null) {
+ future = op.invoke(transactionContext);
+ } else {
+ // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
+ // callback to be executed after the Tx is created.
+ final SettableFuture<T> proxyFuture = SettableFuture.create();
+ addTxOperationOnComplete(new TransactionOperation() {
+ @Override
+ public void invoke(TransactionContext transactionContext) {
+ Futures.addCallback(op.invoke(transactionContext), new FutureCallback<T>() {
+ @Override
+ public void onSuccess(T data) {
+ proxyFuture.set(data);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ proxyFuture.setException(t);
+ }
+ });
+ }
+ });
+
+ future = MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
+ }
+
+ return future;
+ }
+
+ void enqueueModifyOperation(final TransactionOperation op) {
+
+ if (transactionContext != null) {
+ op.invoke(transactionContext);
+ } else {
+ // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
+ // callback to be executed after the Tx is created.
+ addTxOperationOnComplete(op);
+ }
+ }
+
+
+
+
+
+ /**
+ * Performs a CreateTransaction try async.
+ */
+ private void tryCreateTransaction() {
+ Future<Object> createTxFuture = sendCreateTransaction(primaryShard,
+ new CreateTransaction(identifier.toString(),
+ TransactionProxy.this.transactionType.ordinal(),