X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FAbstractClientHistory.java;h=95552b382e21e4c25cf63d8db25ab3e9caec99d7;hb=refs%2Fchanges%2F49%2F85749%2F63;hp=dae12af731846a278386dcf75f8348e61eb30752;hpb=6c1d222b2f87af18e2488870b6708f91d5f6c6f8;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java index dae12af731..95552b382e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java @@ -7,15 +7,22 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; -import com.google.common.base.Preconditions; -import com.google.common.base.Verify; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verifyNotNull; +import static java.util.Objects.requireNonNull; + +import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.locks.StampedLock; -import javax.annotation.concurrent.GuardedBy; +import org.checkerframework.checker.lock.qual.GuardedBy; +import org.checkerframework.checker.lock.qual.Holding; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.cluster.access.client.AbstractClientConnection; import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection; import org.opendaylight.controller.cluster.access.client.ConnectionEntry; @@ -24,7 +31,7 @@ import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryReq import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.mdsal.common.api.TransactionChainClosedException; +import org.opendaylight.mdsal.dom.api.DOMTransactionChainClosedException; import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; @@ -58,8 +65,8 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id private final Map histories = new ConcurrentHashMap<>(); private final StampedLock lock = new StampedLock(); - private final AbstractDataStoreClientBehavior client; - private final LocalHistoryIdentifier identifier; + private final @NonNull AbstractDataStoreClientBehavior client; + private final @NonNull LocalHistoryIdentifier identifier; // Used via NEXT_TX_UPDATER @SuppressWarnings("unused") @@ -68,9 +75,9 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id private volatile State state = State.IDLE; AbstractClientHistory(final AbstractDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) { - this.client = Preconditions.checkNotNull(client); - this.identifier = Preconditions.checkNotNull(identifier); - Preconditions.checkArgument(identifier.getCookie() == 0); + this.client = requireNonNull(client); + this.identifier = requireNonNull(identifier); + checkArgument(identifier.getCookie() == 0); } final State state() { @@ -79,14 +86,14 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id final void updateState(final State expected, final State next) { final boolean success = STATE_UPDATER.compareAndSet(this, expected, next); - Preconditions.checkState(success, "Race condition detected, state changed from %s to %s", expected, state); + checkState(success, "Race condition detected, state changed from %s to %s", expected, state); LOG.debug("Client history {} changed state from {} to {}", this, expected, next); } final synchronized void doClose() { final State local = state; if (local != State.CLOSED) { - Preconditions.checkState(local == State.IDLE, "Local history %s has an open transaction", this); + checkState(local == State.IDLE, "Local history %s has an open transaction", this); histories.values().forEach(ProxyHistory::close); updateState(local, State.CLOSED); } @@ -98,7 +105,7 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id } @Override - public LocalHistoryIdentifier getIdentifier() { + public final LocalHistoryIdentifier getIdentifier() { return identifier; } @@ -129,9 +136,10 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id /** * Create a new history proxy for a given shard. * + * @param shard Shard cookie * @throws InversibleLockException if the shard is being reconnected */ - @GuardedBy("lock") + @Holding("lock") private ProxyHistory createHistoryProxy(final Long shard) { final AbstractClientConnection connection = client.getConnection(shard); final LocalHistoryIdentifier proxyId = new LocalHistoryIdentifier(identifier.getClientId(), @@ -155,7 +163,7 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id LOG.debug("Create history response {}", response); } - private ProxyHistory ensureHistoryProxy(final TransactionIdentifier transactionId, final Long shard) { + private @NonNull ProxyHistory ensureHistoryProxy(final TransactionIdentifier transactionId, final Long shard) { while (true) { try { // Short-lived lock to ensure exclusion of createHistoryProxy and the lookup phase in startReconnect, @@ -174,17 +182,19 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id } } - final AbstractProxyTransaction createSnapshotProxy(final TransactionIdentifier transactionId, final Long shard) { + final @NonNull AbstractProxyTransaction createSnapshotProxy(final TransactionIdentifier transactionId, + final Long shard) { return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, true); } - final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) { + final @NonNull AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, + final Long shard) { return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, false); } private void checkNotClosed() { if (state == State.CLOSED) { - throw new TransactionChainClosedException(String.format("Local history %s is closed", identifier)); + throw new DOMTransactionChainClosedException(String.format("Local history %s is closed", identifier)); } } @@ -192,10 +202,11 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id * Allocate a new {@link ClientTransaction}. * * @return A new {@link ClientTransaction} - * @throws TransactionChainClosedException if this history is closed + * @throws DOMTransactionChainClosedException if this history is closed * @throws IllegalStateException if a previous dependent transaction has not been closed */ - public ClientTransaction createTransaction() { + // Non-final for mocking + public @NonNull ClientTransaction createTransaction() { checkNotClosed(); synchronized (this) { @@ -209,9 +220,10 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id * Create a new {@link ClientSnapshot}. * * @return A new {@link ClientSnapshot} - * @throws TransactionChainClosedException if this history is closed + * @throws DOMTransactionChainClosedException if this history is closed * @throws IllegalStateException if a previous dependent transaction has not been closed */ + // Non-final for mocking public ClientSnapshot takeSnapshot() { checkNotClosed(); @@ -222,16 +234,38 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id } } - @GuardedBy("this") + @Holding("this") abstract ClientSnapshot doCreateSnapshot(); - @GuardedBy("this") + @Holding("this") abstract ClientTransaction doCreateTransaction(); /** - * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission. + * Callback invoked from {@link AbstractClientHandle}'s lifecycle to inform that a particular transaction is + * completing with a set of participating shards. * * @param txId Transaction identifier + * @param participatingShards Participating shard cookies + */ + final void onTransactionShardsBound(final TransactionIdentifier txId, final Set participatingShards) { + // Guard against startReconnect() kicking in. It is okay to connect new participants concurrently, as those + // will not see the holes caused by this. + final long stamp = lock.readLock(); + try { + for (var entry : histories.entrySet()) { + if (!participatingShards.contains(entry.getKey())) { + entry.getValue().skipTransaction(txId); + } + } + } finally { + lock.unlockRead(stamp); + } + } + + /** + * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission. + * + * @param tx Client transaction * @param cohort Transaction commit cohort */ synchronized AbstractTransactionCommitCohort onTransactionReady(final ClientTransaction tx, @@ -242,8 +276,7 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id } final AbstractTransactionCommitCohort previous = readyTransactions.putIfAbsent(txId, cohort); - Preconditions.checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s", - cohort, txId, previous); + checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s", cohort, txId, previous); LOG.debug("Local history {} readied transaction {}", this, txId); return cohort; @@ -267,13 +300,14 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id * * @param txId transaction identifier */ + // Non-final for mocking synchronized void onTransactionComplete(final TransactionIdentifier txId) { if (readyTransactions.remove(txId) == null) { LOG.warn("Could not find completed transaction {}", txId); } } - HistoryReconnectCohort startReconnect(final ConnectedClientConnection newConn) { + final HistoryReconnectCohort startReconnect(final ConnectedClientConnection newConn) { /* * This looks ugly and unusual and there is a reason for that, as the locking involved is in multiple places. * @@ -298,7 +332,7 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id return null; } - final ProxyReconnectCohort proxy = Verify.verifyNotNull(oldProxy.startReconnect(newConn)); + final ProxyReconnectCohort proxy = verifyNotNull(oldProxy.startReconnect(newConn)); return new HistoryReconnectCohort() { @Override ProxyReconnectCohort getProxy() { @@ -306,7 +340,7 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id } @Override - void replayRequests(final Iterable previousEntries) { + void replayRequests(final Collection previousEntries) { proxy.replayRequests(previousEntries); } @@ -321,5 +355,4 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id } }; } - }