import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
}
@Override
- public LocalHistoryIdentifier getIdentifier() {
+ public final LocalHistoryIdentifier getIdentifier() {
return identifier;
}
/**
* Create a new history proxy for a given shard.
*
+ * @param shard Shard cookie
* @throws InversibleLockException if the shard is being reconnected
*/
@Holding("lock")
* @throws DOMTransactionChainClosedException if this history is closed
* @throws IllegalStateException if a previous dependent transaction has not been closed
*/
+ // Non-final for mocking
public @NonNull ClientTransaction createTransaction() {
checkNotClosed();
* @throws DOMTransactionChainClosedException if this history is closed
* @throws IllegalStateException if a previous dependent transaction has not been closed
*/
+ // Non-final for mocking
public ClientSnapshot takeSnapshot() {
checkNotClosed();
abstract ClientTransaction doCreateTransaction();
/**
- * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission.
+ * Callback invoked from {@link AbstractClientHandle}'s lifecycle to inform that a particular transaction is
+ * completing with a set of participating shards.
*
* @param txId Transaction identifier
+ * @param participatingShards Participating shard cookies
+ */
+ final void onTransactionShardsBound(final TransactionIdentifier txId, final Set<Long> participatingShards) {
+ // Guard against startReconnect() kicking in. It is okay to connect new participants concurrently, as those
+ // will not see the holes caused by this.
+ final long stamp = lock.readLock();
+ try {
+ for (var entry : histories.entrySet()) {
+ if (!participatingShards.contains(entry.getKey())) {
+ entry.getValue().skipTransaction(txId);
+ }
+ }
+ } finally {
+ lock.unlockRead(stamp);
+ }
+ }
+
+ /**
+ * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission.
+ *
+ * @param tx Client transaction
* @param cohort Transaction commit cohort
*/
synchronized AbstractTransactionCommitCohort onTransactionReady(final ClientTransaction tx,
*
* @param txId transaction identifier
*/
+ // Non-final for mocking
synchronized void onTransactionComplete(final TransactionIdentifier txId) {
if (readyTransactions.remove(txId) == null) {
LOG.warn("Could not find completed transaction {}", txId);
}
}
- HistoryReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConn) {
+ final HistoryReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConn) {
/*
* This looks ugly and unusual and there is a reason for that, as the locking involved is in multiple places.
*
}
};
}
-
}