import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.StampedLock;
+import java.util.stream.Stream;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.checkerframework.checker.lock.qual.Holding;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import org.opendaylight.mdsal.dom.api.DOMTransactionChainClosedException;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
}
@Override
- public LocalHistoryIdentifier getIdentifier() {
+ public final LocalHistoryIdentifier getIdentifier() {
return identifier;
}
return client.resolveShardForPath(path);
}
+ final Stream<Long> resolveAllShards() {
+ return client.resolveAllShards();
+ }
+
+ final ActorUtils actorUtils() {
+ return client.actorUtils();
+ }
+
@Override
final void localAbort(final Throwable cause) {
final State oldState = STATE_UPDATER.getAndSet(this, State.CLOSED);
/**
* Create a new history proxy for a given shard.
*
+ * @param shard Shard cookie
* @throws InversibleLockException if the shard is being reconnected
*/
@Holding("lock")
* @throws DOMTransactionChainClosedException if this history is closed
* @throws IllegalStateException if a previous dependent transaction has not been closed
*/
+ // Non-final for mocking
public @NonNull ClientTransaction createTransaction() {
checkNotClosed();
* @throws DOMTransactionChainClosedException if this history is closed
* @throws IllegalStateException if a previous dependent transaction has not been closed
*/
+ // Non-final for mocking
public ClientSnapshot takeSnapshot() {
checkNotClosed();
abstract ClientTransaction doCreateTransaction();
/**
- * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission.
+ * Callback invoked from {@link AbstractClientHandle}'s lifecycle to inform that a particular transaction is
+ * completing with a set of participating shards.
*
* @param txId Transaction identifier
+ * @param participatingShards Participating shard cookies
+ */
+ final void onTransactionShardsBound(final TransactionIdentifier txId, final Set<Long> participatingShards) {
+ // Guard against startReconnect() kicking in. It is okay to connect new participants concurrently, as those
+ // will not see the holes caused by this.
+ final long stamp = lock.readLock();
+ try {
+ for (var entry : histories.entrySet()) {
+ if (!participatingShards.contains(entry.getKey())) {
+ entry.getValue().skipTransaction(txId);
+ }
+ }
+ } finally {
+ lock.unlockRead(stamp);
+ }
+ }
+
+ /**
+ * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission.
+ *
+ * @param tx Client transaction
* @param cohort Transaction commit cohort
*/
synchronized AbstractTransactionCommitCohort onTransactionReady(final ClientTransaction tx,
*
* @param txId transaction identifier
*/
+ // Non-final for mocking
synchronized void onTransactionComplete(final TransactionIdentifier txId) {
if (readyTransactions.remove(txId) == null) {
LOG.warn("Could not find completed transaction {}", txId);
}
}
- HistoryReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConn) {
+ final HistoryReconnectCohort startReconnect(final ConnectedClientConnection<ShardBackendInfo> newConn) {
/*
* This looks ugly and unusual and there is a reason for that, as the locking involved is in multiple places.
*
}
};
}
-
}