X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FAbstractClientHistory.java;h=796c23614e2fa220660bb09c6e7db0f84162b8df;hb=refs%2Fchanges%2F78%2F100478%2F3;hp=d445d3c2f6f9e542ec031c2737653317e3952a73;hpb=d502ae8e1529b69af55a59c3e664a02457c05ec6;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java index d445d3c2f6..796c23614e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java @@ -15,10 +15,12 @@ import static java.util.Objects.requireNonNull; import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.locks.StampedLock; +import java.util.stream.Stream; import org.checkerframework.checker.lock.qual.GuardedBy; import org.checkerframework.checker.lock.qual.Holding; import org.eclipse.jdt.annotation.NonNull; @@ -30,6 +32,7 @@ import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryReq import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; import org.opendaylight.mdsal.dom.api.DOMTransactionChainClosedException; import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -104,7 +107,7 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id } @Override - public LocalHistoryIdentifier getIdentifier() { + public final LocalHistoryIdentifier getIdentifier() { return identifier; } @@ -116,6 +119,14 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id return client.resolveShardForPath(path); } + final Stream resolveAllShards() { + return client.resolveAllShards(); + } + + final ActorUtils actorUtils() { + return client.actorUtils(); + } + @Override final void localAbort(final Throwable cause) { final State oldState = STATE_UPDATER.getAndSet(this, State.CLOSED); @@ -135,6 +146,7 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id /** * Create a new history proxy for a given shard. * + * @param shard Shard cookie * @throws InversibleLockException if the shard is being reconnected */ @Holding("lock") @@ -203,6 +215,7 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id * @throws DOMTransactionChainClosedException if this history is closed * @throws IllegalStateException if a previous dependent transaction has not been closed */ + // Non-final for mocking public @NonNull ClientTransaction createTransaction() { checkNotClosed(); @@ -220,6 +233,7 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id * @throws DOMTransactionChainClosedException if this history is closed * @throws IllegalStateException if a previous dependent transaction has not been closed */ + // Non-final for mocking public ClientSnapshot takeSnapshot() { checkNotClosed(); @@ -237,9 +251,31 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id abstract ClientTransaction doCreateTransaction(); /** - * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission. + * Callback invoked from {@link AbstractClientHandle}'s lifecycle to inform that a particular transaction is + * completing with a set of participating shards. * * @param txId Transaction identifier + * @param participatingShards Participating shard cookies + */ + final void onTransactionShardsBound(final TransactionIdentifier txId, final Set participatingShards) { + // Guard against startReconnect() kicking in. It is okay to connect new participants concurrently, as those + // will not see the holes caused by this. + final long stamp = lock.readLock(); + try { + for (var entry : histories.entrySet()) { + if (!participatingShards.contains(entry.getKey())) { + entry.getValue().skipTransaction(txId); + } + } + } finally { + lock.unlockRead(stamp); + } + } + + /** + * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission. + * + * @param tx Client transaction * @param cohort Transaction commit cohort */ synchronized AbstractTransactionCommitCohort onTransactionReady(final ClientTransaction tx, @@ -274,13 +310,14 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id * * @param txId transaction identifier */ + // Non-final for mocking synchronized void onTransactionComplete(final TransactionIdentifier txId) { if (readyTransactions.remove(txId) == null) { LOG.warn("Could not find completed transaction {}", txId); } } - HistoryReconnectCohort startReconnect(final ConnectedClientConnection newConn) { + final HistoryReconnectCohort startReconnect(final ConnectedClientConnection newConn) { /* * This looks ugly and unusual and there is a reason for that, as the locking involved is in multiple places. * @@ -328,5 +365,4 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id } }; } - }