import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.StampedLock;
+import java.util.stream.Stream;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.checkerframework.checker.lock.qual.Holding;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import org.opendaylight.mdsal.dom.api.DOMTransactionChainClosedException;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
return client.resolveShardForPath(path);
}
+ final Stream<Long> resolveAllShards() {
+ return client.resolveAllShards();
+ }
+
+ final ActorUtils actorUtils() {
+ return client.actorUtils();
+ }
+
@Override
final void localAbort(final Throwable cause) {
final State oldState = STATE_UPDATER.getAndSet(this, State.CLOSED);
@Holding("this")
abstract ClientTransaction doCreateTransaction();
+ /**
+ * Callback invoked from {@link AbstractClientHandle}'s lifecycle to inform that a particular transaction is
+ * completing with a set of participating shards.
+ *
+ * @param txId Transaction identifier
+ * @param participatingShards Participating shard cookies
+ */
+ final void onTransactionShardsBound(final TransactionIdentifier txId, final Set<Long> participatingShards) {
+ // Guard against startReconnect() kicking in. It is okay to connect new participants concurrently, as those
+ // will not see the holes caused by this.
+ final long stamp = lock.readLock();
+ try {
+ for (var entry : histories.entrySet()) {
+ if (!participatingShards.contains(entry.getKey())) {
+ entry.getValue().skipTransaction(txId);
+ }
+ }
+ } finally {
+ lock.unlockRead(stamp);
+ }
+ }
+
/**
* Callback invoked from {@link ClientTransaction} when a child transaction readied for submission.
*