X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FAbstractClientHistory.java;h=796c23614e2fa220660bb09c6e7db0f84162b8df;hb=refs%2Fchanges%2F78%2F100478%2F3;hp=1e8d03a8eccfc96594c4d06ca4c050fb736b9f8d;hpb=b74c6012092e47430a8f4d6f4ddeb1d3e2b1b7df;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java index 1e8d03a8ec..796c23614e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java @@ -7,16 +7,23 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; -import com.google.common.base.Preconditions; -import com.google.common.base.Verify; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verifyNotNull; +import static java.util.Objects.requireNonNull; + import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.locks.StampedLock; -import javax.annotation.concurrent.GuardedBy; +import java.util.stream.Stream; +import org.checkerframework.checker.lock.qual.GuardedBy; +import org.checkerframework.checker.lock.qual.Holding; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.cluster.access.client.AbstractClientConnection; import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection; import org.opendaylight.controller.cluster.access.client.ConnectionEntry; @@ -25,7 +32,8 @@ import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryReq import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.mdsal.common.api.TransactionChainClosedException; +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; +import org.opendaylight.mdsal.dom.api.DOMTransactionChainClosedException; import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; @@ -59,8 +67,8 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id private final Map histories = new ConcurrentHashMap<>(); private final StampedLock lock = new StampedLock(); - private final AbstractDataStoreClientBehavior client; - private final LocalHistoryIdentifier identifier; + private final @NonNull AbstractDataStoreClientBehavior client; + private final @NonNull LocalHistoryIdentifier identifier; // Used via NEXT_TX_UPDATER @SuppressWarnings("unused") @@ -69,9 +77,9 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id private volatile State state = State.IDLE; AbstractClientHistory(final AbstractDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) { - this.client = Preconditions.checkNotNull(client); - this.identifier = Preconditions.checkNotNull(identifier); - Preconditions.checkArgument(identifier.getCookie() == 0); + this.client = requireNonNull(client); + this.identifier = requireNonNull(identifier); + checkArgument(identifier.getCookie() == 0); } final State state() { @@ -80,14 +88,14 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id final void updateState(final State expected, final State next) { final boolean success = STATE_UPDATER.compareAndSet(this, expected, next); - Preconditions.checkState(success, "Race condition detected, state changed from %s to %s", expected, state); + checkState(success, "Race condition detected, state changed from %s to %s", expected, state); LOG.debug("Client history {} changed state from {} to {}", this, expected, next); } final synchronized void doClose() { final State local = state; if (local != State.CLOSED) { - Preconditions.checkState(local == State.IDLE, "Local history %s has an open transaction", this); + checkState(local == State.IDLE, "Local history %s has an open transaction", this); histories.values().forEach(ProxyHistory::close); updateState(local, State.CLOSED); } @@ -99,7 +107,7 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id } @Override - public LocalHistoryIdentifier getIdentifier() { + public final LocalHistoryIdentifier getIdentifier() { return identifier; } @@ -111,6 +119,14 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id return client.resolveShardForPath(path); } + final Stream resolveAllShards() { + return client.resolveAllShards(); + } + + final ActorUtils actorUtils() { + return client.actorUtils(); + } + @Override final void localAbort(final Throwable cause) { final State oldState = STATE_UPDATER.getAndSet(this, State.CLOSED); @@ -130,9 +146,10 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id /** * Create a new history proxy for a given shard. * + * @param shard Shard cookie * @throws InversibleLockException if the shard is being reconnected */ - @GuardedBy("lock") + @Holding("lock") private ProxyHistory createHistoryProxy(final Long shard) { final AbstractClientConnection connection = client.getConnection(shard); final LocalHistoryIdentifier proxyId = new LocalHistoryIdentifier(identifier.getClientId(), @@ -156,7 +173,7 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id LOG.debug("Create history response {}", response); } - private ProxyHistory ensureHistoryProxy(final TransactionIdentifier transactionId, final Long shard) { + private @NonNull ProxyHistory ensureHistoryProxy(final TransactionIdentifier transactionId, final Long shard) { while (true) { try { // Short-lived lock to ensure exclusion of createHistoryProxy and the lookup phase in startReconnect, @@ -175,17 +192,19 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id } } - final AbstractProxyTransaction createSnapshotProxy(final TransactionIdentifier transactionId, final Long shard) { + final @NonNull AbstractProxyTransaction createSnapshotProxy(final TransactionIdentifier transactionId, + final Long shard) { return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, true); } - final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) { + final @NonNull AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, + final Long shard) { return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, false); } private void checkNotClosed() { if (state == State.CLOSED) { - throw new TransactionChainClosedException(String.format("Local history %s is closed", identifier)); + throw new DOMTransactionChainClosedException(String.format("Local history %s is closed", identifier)); } } @@ -193,10 +212,11 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id * Allocate a new {@link ClientTransaction}. * * @return A new {@link ClientTransaction} - * @throws TransactionChainClosedException if this history is closed + * @throws DOMTransactionChainClosedException if this history is closed * @throws IllegalStateException if a previous dependent transaction has not been closed */ - public ClientTransaction createTransaction() { + // Non-final for mocking + public @NonNull ClientTransaction createTransaction() { checkNotClosed(); synchronized (this) { @@ -210,9 +230,10 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id * Create a new {@link ClientSnapshot}. * * @return A new {@link ClientSnapshot} - * @throws TransactionChainClosedException if this history is closed + * @throws DOMTransactionChainClosedException if this history is closed * @throws IllegalStateException if a previous dependent transaction has not been closed */ + // Non-final for mocking public ClientSnapshot takeSnapshot() { checkNotClosed(); @@ -223,16 +244,38 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id } } - @GuardedBy("this") + @Holding("this") abstract ClientSnapshot doCreateSnapshot(); - @GuardedBy("this") + @Holding("this") abstract ClientTransaction doCreateTransaction(); /** - * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission. + * Callback invoked from {@link AbstractClientHandle}'s lifecycle to inform that a particular transaction is + * completing with a set of participating shards. * * @param txId Transaction identifier + * @param participatingShards Participating shard cookies + */ + final void onTransactionShardsBound(final TransactionIdentifier txId, final Set participatingShards) { + // Guard against startReconnect() kicking in. It is okay to connect new participants concurrently, as those + // will not see the holes caused by this. + final long stamp = lock.readLock(); + try { + for (var entry : histories.entrySet()) { + if (!participatingShards.contains(entry.getKey())) { + entry.getValue().skipTransaction(txId); + } + } + } finally { + lock.unlockRead(stamp); + } + } + + /** + * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission. + * + * @param tx Client transaction * @param cohort Transaction commit cohort */ synchronized AbstractTransactionCommitCohort onTransactionReady(final ClientTransaction tx, @@ -243,8 +286,7 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id } final AbstractTransactionCommitCohort previous = readyTransactions.putIfAbsent(txId, cohort); - Preconditions.checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s", - cohort, txId, previous); + checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s", cohort, txId, previous); LOG.debug("Local history {} readied transaction {}", this, txId); return cohort; @@ -268,13 +310,14 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id * * @param txId transaction identifier */ + // Non-final for mocking synchronized void onTransactionComplete(final TransactionIdentifier txId) { if (readyTransactions.remove(txId) == null) { LOG.warn("Could not find completed transaction {}", txId); } } - HistoryReconnectCohort startReconnect(final ConnectedClientConnection newConn) { + final HistoryReconnectCohort startReconnect(final ConnectedClientConnection newConn) { /* * This looks ugly and unusual and there is a reason for that, as the locking involved is in multiple places. * @@ -299,7 +342,7 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id return null; } - final ProxyReconnectCohort proxy = Verify.verifyNotNull(oldProxy.startReconnect(newConn)); + final ProxyReconnectCohort proxy = verifyNotNull(oldProxy.startReconnect(newConn)); return new HistoryReconnectCohort() { @Override ProxyReconnectCohort getProxy() { @@ -322,5 +365,4 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id } }; } - }