- final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
- recordPendingTransaction(command, transaction);
- command.execute(transaction);
- Futures.addCallback(transaction.submit(), new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void result) {
- successfulTransactionQueue.offer(transaction);
- }
-
- @Override
- public void onFailure(final Throwable throwable) {
- // NOOP - handled by failure of transaction chain
- }
- });
+ synchronized (this) {
+ final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
+ transactionInFlight = transaction;
+ recordPendingTransaction(command, transaction);
+ command.execute(transaction);
+ Futures.addCallback(transaction.submit(), new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ forgetSuccessfulTransaction(transaction);
+ command.onSuccess();
+ }
+
+ @Override
+ public void onFailure(final Throwable throwable) {
+ command.onFailure(throwable);
+ // NOOP - handled by failure of transaction chain
+ }
+ }, MoreExecutors.directExecutor());
+ }
+ }
+ } catch (IllegalStateException e) {
+ if (transactionInFlight != null) {
+ // TODO: This method should distinguish exceptions on which the command should be
+ // retried from exceptions on which the command should NOT be retried.
+ // Then it should retry only the commands which should be retried, otherwise
+ // this method will retry commands which will never be successful forever.
+ offerFailedTransaction(transactionInFlight);