+ public final ClientTransaction createTransaction() {
+ Preconditions.checkState(state != State.CLOSED);
+
+ synchronized (this) {
+ final ClientTransaction ret = doCreateTransaction();
+ openTransactions.put(ret.getIdentifier(), ret);
+ return ret;
+ }
+ }
+
+ @GuardedBy("this")
+ abstract ClientTransaction doCreateTransaction();
+
+ /**
+ * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission.
+ *
+ * @param txId Transaction identifier
+ * @param cohort Transaction commit cohort
+ */
+ synchronized AbstractTransactionCommitCohort onTransactionReady(final TransactionIdentifier txId,
+ final AbstractTransactionCommitCohort cohort) {
+ final ClientTransaction tx = openTransactions.remove(txId);
+ Preconditions.checkState(tx != null, "Failed to find open transaction for %s", txId);
+
+ final AbstractTransactionCommitCohort previous = readyTransactions.putIfAbsent(txId, cohort);
+ Preconditions.checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s",
+ cohort, txId, previous);
+
+ return cohort;
+ }
+
+ /**
+ * Callback invoked from {@link ClientTransaction} when a child transaction has been aborted without touching
+ * backend.
+ *
+ * @param txId transaction identifier
+ */
+ synchronized void onTransactionAbort(final TransactionIdentifier txId) {
+ if (openTransactions.remove(txId) == null) {
+ LOG.warn("Could not find aborting transaction {}", txId);
+ }
+ }
+