+
+ /**
+ * Create a new {@link ClientSnapshot}.
+ *
+ * @return A new {@link ClientSnapshot}
+ * @throws TransactionChainClosedException if this history is closed
+ * @throws IllegalStateException if a previous dependent transaction has not been closed
+ */
+ public ClientSnapshot takeSnapshot() {
+ checkNotClosed();
+
+ synchronized (this) {
+ final ClientSnapshot ret = doCreateSnapshot();
+ openTransactions.put(ret.getIdentifier(), ret);
+ return ret;
+ }
+ }
+
+ @GuardedBy("this")
+ abstract ClientSnapshot doCreateSnapshot();
+
+ @GuardedBy("this")
+ abstract ClientTransaction doCreateTransaction();
+
+ /**
+ * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission.
+ *
+ * @param txId Transaction identifier
+ * @param cohort Transaction commit cohort
+ */
+ synchronized AbstractTransactionCommitCohort onTransactionReady(final ClientTransaction tx,
+ final AbstractTransactionCommitCohort cohort) {
+ final TransactionIdentifier txId = tx.getIdentifier();
+ if (openTransactions.remove(txId) == null) {
+ LOG.warn("Transaction {} not recorded, proceeding with readiness", txId);
+ }
+
+ final AbstractTransactionCommitCohort previous = readyTransactions.putIfAbsent(txId, cohort);
+ Preconditions.checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s",
+ cohort, txId, previous);
+
+ LOG.debug("Local history {} readied transaction {}", this, txId);
+ return cohort;
+ }
+
+ /**
+ * Callback invoked from {@link ClientTransaction} when a child transaction has been aborted without touching
+ * backend.
+ *
+ * @param snapshot transaction identifier
+ */
+ synchronized void onTransactionAbort(final AbstractClientHandle<?> snapshot) {
+ if (openTransactions.remove(snapshot.getIdentifier()) == null) {
+ LOG.warn("Could not find aborting transaction {}", snapshot.getIdentifier());
+ }
+ }
+
+ /**
+ * Callback invoked from {@link AbstractTransactionCommitCohort} when a child transaction has been completed
+ * and all its state can be removed.
+ *
+ * @param txId transaction identifier
+ */
+ synchronized void onTransactionComplete(final TransactionIdentifier txId) {
+ if (readyTransactions.remove(txId) == null) {
+ LOG.warn("Could not find completed transaction {}", txId);
+ }
+ }
+
+ HistoryReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConn) {
+ /*
+ * This looks ugly and unusual and there is a reason for that, as the locking involved is in multiple places.
+ *
+ * We need to make sure that a new proxy is not created while we are reconnecting, which is partially satisfied
+ * by client.getConnection() throwing InversibleLockException by the time this method is invoked. That does
+ * not cover the case when createHistoryProxy() has already acquired the connection, but has not yet populated
+ * the history map.
+ *
+ * Hence we need to make sure no potential computation is happening concurrently with us looking at the history
+ * map. Once we have performed that lookup, though, we can release the lock immediately, as all creation
+ * requests are established to happen either before or after the reconnect attempt.
+ */
+ final ProxyHistory oldProxy;
+ final long stamp = lock.writeLock();
+ try {
+ oldProxy = histories.get(newConn.cookie());
+ } finally {
+ lock.unlockWrite(stamp);
+ }
+
+ if (oldProxy == null) {
+ return null;
+ }
+
+ final ProxyReconnectCohort proxy = Verify.verifyNotNull(oldProxy.startReconnect(newConn));
+ return new HistoryReconnectCohort() {
+ @Override
+ ProxyReconnectCohort getProxy() {
+ return proxy;
+ }
+
+ @Override
+ void replayRequests(final Iterable<ConnectionEntry> previousEntries) {
+ proxy.replayRequests(previousEntries);
+ }
+
+ @Override
+ public void close() {
+ LOG.debug("Client history {} finishing reconnect to {}", AbstractClientHistory.this, newConn);
+ final ProxyHistory newProxy = proxy.finishReconnect();
+ if (!histories.replace(newConn.cookie(), oldProxy, newProxy)) {
+ LOG.warn("Failed to replace proxy {} with {} in {}", oldProxy, newProxy,
+ AbstractClientHistory.this);
+ }
+ }
+ };
+ }
+