AtomicReferenceFieldUpdater.newUpdater(AbstractClientHistory.class, State.class, "state");
@GuardedBy("this")
- private final Map<TransactionIdentifier, ClientTransaction> openTransactions = new HashMap<>();
+ private final Map<TransactionIdentifier, AbstractClientHandle<?>> openTransactions = new HashMap<>();
@GuardedBy("this")
private final Map<TransactionIdentifier, AbstractTransactionCommitCohort> readyTransactions = new HashMap<>();
LOG.debug("Force-closing history {}", getIdentifier(), cause);
synchronized (this) {
- for (ClientTransaction t : openTransactions.values()) {
+ for (AbstractClientHandle<?> t : openTransactions.values()) {
t.localAbort(cause);
}
openTransactions.clear();
LOG.debug("Create history response {}", response);
}
- final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) {
+ private ProxyHistory ensureHistoryProxy(final TransactionIdentifier transactionId, final Long shard) {
while (true) {
- final ProxyHistory history;
try {
- history = histories.computeIfAbsent(shard, this::createHistoryProxy);
+ return histories.computeIfAbsent(shard, this::createHistoryProxy);
} catch (InversibleLockException e) {
LOG.trace("Waiting for transaction {} shard {} connection to resolve", transactionId, shard);
e.awaitResolution();
LOG.trace("Retrying transaction {} shard {} connection", transactionId, shard);
- continue;
}
+ }
+ }
+
+ final AbstractProxyTransaction createSnapshotProxy(final TransactionIdentifier transactionId, final Long shard) {
+ return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, true);
+ }
+
+ final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) {
+ return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, false);
+ }
- return history.createTransactionProxy(transactionId);
+ private void checkNotClosed() {
+ if (state == State.CLOSED) {
+ throw new TransactionChainClosedException(String.format("Local history %s is closed", identifier));
}
}
/**
- * Allocate a {@link ClientTransaction}.
+ * Allocate a new {@link ClientTransaction}.
*
* @return A new {@link ClientTransaction}
* @throws TransactionChainClosedException if this history is closed
+ * @throws IllegalStateException if a previous dependent transaction has not been closed
*/
public final ClientTransaction createTransaction() {
- if (state == State.CLOSED) {
- throw new TransactionChainClosedException(String.format("Local history %s is closed", identifier));
- }
+ checkNotClosed();
synchronized (this) {
final ClientTransaction ret = doCreateTransaction();
}
}
+ /**
+ * Create a new {@link ClientSnapshot}.
+ *
+ * @return A new {@link ClientSnapshot}
+ * @throws TransactionChainClosedException if this history is closed
+ * @throws IllegalStateException if a previous dependent transaction has not been closed
+ */
+ public final ClientSnapshot takeSnapshot() {
+ checkNotClosed();
+
+ synchronized (this) {
+ final ClientSnapshot ret = doCreateSnapshot();
+ openTransactions.put(ret.getIdentifier(), ret);
+ return ret;
+ }
+ }
+
+ @GuardedBy("this")
+ abstract ClientSnapshot doCreateSnapshot();
+
@GuardedBy("this")
abstract ClientTransaction doCreateTransaction();
* @param txId Transaction identifier
* @param cohort Transaction commit cohort
*/
- synchronized AbstractTransactionCommitCohort onTransactionReady(final TransactionIdentifier txId,
+ synchronized AbstractTransactionCommitCohort onTransactionReady(final ClientTransaction tx,
final AbstractTransactionCommitCohort cohort) {
- final ClientTransaction tx = openTransactions.remove(txId);
- Preconditions.checkState(tx != null, "Failed to find open transaction for %s", txId);
+ final TransactionIdentifier txId = tx.getIdentifier();
+ if (openTransactions.remove(txId) == null) {
+ LOG.warn("Transaction {} not recorded, proceeding with readiness", txId);
+ }
final AbstractTransactionCommitCohort previous = readyTransactions.putIfAbsent(txId, cohort);
Preconditions.checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s",
* Callback invoked from {@link ClientTransaction} when a child transaction has been aborted without touching
* backend.
*
- * @param txId transaction identifier
+ * @param snapshot transaction identifier
*/
- synchronized void onTransactionAbort(final TransactionIdentifier txId) {
- if (openTransactions.remove(txId) == null) {
- LOG.warn("Could not find aborting transaction {}", txId);
+ synchronized void onTransactionAbort(final AbstractClientHandle<?> snapshot) {
+ if (openTransactions.remove(snapshot.getIdentifier()) == null) {
+ LOG.warn("Could not find aborting transaction {}", snapshot.getIdentifier());
}
}