WriteTrackingReadWriteTransaction wrappedTx = new NonSubmitCancelableReadWriteTransaction(realTx);
try {
txConsumer.accept(wrappedTx);
- if (wrappedTx.isWritten()) {
- // The transaction contains changes, commit it
- return realTx.submit();
- } else {
- // The transaction only handled reads, cancel it
- realTx.cancel();
- return Futures.immediateCheckedFuture(null);
- }
+ return commit(realTx, null, wrappedTx);
// catch Exception for both the <E extends Exception> thrown by accept() as well as any RuntimeException
} catch (Exception e) {
if (!realTx.cancel()) {
new WriteTrackingTypedReadWriteTransactionImpl<>(datastoreType, realTx);
try {
txConsumer.accept(wrappedTx);
- if (wrappedTx.isWritten()) {
- // The transaction contains changes, commit it
- return realTx.commit().transform(v -> null, MoreExecutors.directExecutor());
- } else {
- // The transaction only handled reads, cancel it
- realTx.cancel();
- return FluentFuture.from(Futures.immediateFuture(null));
- }
+ return commit(realTx, null, wrappedTx);
// catch Exception for both the <E extends Exception> thrown by accept() as well as any RuntimeException
} catch (Exception e) {
if (!realTx.cancel()) {
new WriteTrackingTypedReadWriteTransactionImpl<>(datastoreType, realTx);
try {
R result = txFunction.apply(wrappedTx);
- if (wrappedTx.isWritten()) {
- // The transaction contains changes, commit it
- return realTx.commit().transform(v -> result, MoreExecutors.directExecutor());
- } else {
- // The transaction only handled reads, cancel it
- realTx.cancel();
- return FluentFuture.from(Futures.immediateFuture(result));
- }
+ return commit(realTx, result, wrappedTx);
// catch Exception for both the <E extends Exception> thrown by accept() as well as any RuntimeException
} catch (Exception e) {
if (!realTx.cancel()) {
return chainConsumer.apply(new ManagedTransactionChainImpl(realTxChain));
}
}
+
+ private <R> FluentFuture<R> commit(ReadWriteTransaction realTx, R result, WriteTrackingTransaction wrappedTx) {
+ if (wrappedTx.isWritten()) {
+ // The transaction contains changes, commit it
+ return realTx.commit().transform(v -> result, MoreExecutors.directExecutor());
+ } else {
+ // The transaction only handled reads, cancel it
+ realTx.cancel();
+ return FluentFuture.from(Futures.immediateFuture(result));
+ }
+ }
}
ReadWriteTransaction realTx = transactionFactory.newReadWriteTransaction();
TypedReadWriteTransaction<D> wrappedTx = new TypedReadWriteTransactionImpl<>(datastoreType, realTx);
try {
- R result = txFunction.apply(wrappedTx);
- return realTx.commit().transform(v -> result, MoreExecutors.directExecutor());
+ return commit(realTx, txFunction.apply(wrappedTx));
} catch (Exception e) {
// catch Exception for both the <E extends Exception> thrown by accept() as well as any RuntimeException
if (!realTx.cancel()) {
TypedReadWriteTransaction<D> wrappedTx = new TypedReadWriteTransactionImpl<>(datastoreType, realTx);
try {
txConsumer.accept(wrappedTx);
- return realTx.commit().transform(commitInfo -> null, MoreExecutors.directExecutor());
+ return commit(realTx, null);
// catch Exception for both the <E extends Exception> thrown by accept() as well as any RuntimeException
} catch (Exception e) {
if (!realTx.cancel()) {
new TypedWriteTransactionImpl<>(datastoreType, realTx);
try {
txConsumer.accept(wrappedTx);
- return realTx.commit().transform(commitInfo -> null, MoreExecutors.directExecutor());
+ return commit(realTx, null);
// catch Exception for both the <E extends Exception> thrown by accept() as well as any RuntimeException
} catch (Exception e) {
if (!realTx.cancel()) {
return FluentFuture.from(immediateFailedFuture(e));
}
}
+
+ private <R> FluentFuture<R> commit(WriteTransaction realTx, R result) {
+ return realTx.commit().transform(v -> result, MoreExecutors.directExecutor());
+ }
}