From: Robert Varga Date: Thu, 6 Jul 2017 16:26:44 +0000 (+0200) Subject: BUG-8618: add pause/unpause mechanics for tell-based protocol X-Git-Tag: release/carbon-sr2~44 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=refs%2Fchanges%2F33%2F60033%2F9 BUG-8618: add pause/unpause mechanics for tell-based protocol When we are transitioning to/from paused state, we need to remove all frontend-related state, including pending transactions, to ensure ShardDataTree does not track them. When we change to unpaused leader, we can reconstruct the state from the journal -- the rest will be forwarded from the frontend anyway. Change-Id: I28d486d1a6695e21dd7e6518609680d54e5a15eb Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java index 8563c3e913..47f739330d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java @@ -154,20 +154,25 @@ abstract class AbstractFrontendHistory implements Identifiable { envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now); }); } - void purge(final long sequence, final RequestEnvelope envelope, final long now) { + final void purge(final long sequence, final RequestEnvelope envelope, final long now) { LOG.debug("{}: purging history {}", persistenceId(), getIdentifier()); tree.purgeTransactionChain(getIdentifier(), () -> { envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now); }); } + final void retire() { + transactions.values().forEach(FrontendTransaction::retire); + tree.removeTransactionChain(getIdentifier()); + } + private FrontendTransaction createTransaction(final TransactionRequest request, final TransactionIdentifier id) throws RequestException { if (request instanceof CommitLocalTransactionRequest) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadOnlyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadOnlyTransaction.java index 284acbab0b..b245ceee2d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadOnlyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadOnlyTransaction.java @@ -66,6 +66,11 @@ final class FrontendReadOnlyTransaction extends FrontendTransaction { } } + @Override + void retire() { + // No-op + } + private void handleModifyTransaction(final ModifyTransactionRequest request, final RequestEnvelope envelope, final long now) { // The only valid request here is with abort protocol diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java index 6cacb325a0..50e913025d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java @@ -122,6 +122,22 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { } } + /** + * Retired state, needed to catch and suppress callbacks after we have removed associated state. + */ + private static final class Retired extends State { + private final String prevStateString; + + Retired(final State prevState) { + prevStateString = prevState.toString(); + } + + @Override + public String toString() { + return "RETIRED (in " + prevStateString + ")"; + } + } + private static final Logger LOG = LoggerFactory.getLogger(FrontendReadWriteTransaction.class); private static final State ABORTED = new State() { @Override @@ -196,6 +212,11 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { } } + @Override + void retire() { + state = new Retired(state); + } + private void handleTransactionPreCommit(final TransactionPreCommitRequest request, final RequestEnvelope envelope, final long now) throws RequestException { throwIfFailed(); @@ -211,10 +232,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { ready.readyCohort.preCommit(new FutureCallback() { @Override public void onSuccess(final DataTreeCandidate result) { - LOG.debug("{}: Transaction {} completed preCommit", persistenceId(), getIdentifier()); - recordAndSendSuccess(envelope, now, new TransactionPreCommitSuccess(getIdentifier(), - request.getSequence())); - ready.stage = CommitStage.PRE_COMMIT_COMPLETE; + successfulPreCommit(envelope, now); } @Override @@ -233,7 +251,26 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { } } - private void failTransaction(final RequestEnvelope envelope, final long now, final RuntimeRequestException cause) { + void successfulPreCommit(final RequestEnvelope envelope, final long startTime) { + if (state instanceof Retired) { + LOG.debug("{}: Suppressing successful preCommit of retired transaction {}", persistenceId(), + getIdentifier()); + return; + } + + final Ready ready = checkReady(); + LOG.debug("{}: Transaction {} completed preCommit", persistenceId(), getIdentifier()); + recordAndSendSuccess(envelope, startTime, new TransactionPreCommitSuccess(getIdentifier(), + envelope.getMessage().getSequence())); + ready.stage = CommitStage.PRE_COMMIT_COMPLETE; + } + + void failTransaction(final RequestEnvelope envelope, final long now, final RuntimeRequestException cause) { + if (state instanceof Retired) { + LOG.debug("{}: Suppressing failure of retired transaction {}", persistenceId(), getIdentifier(), cause); + return; + } + recordAndSendFailure(envelope, now, cause); state = new Failed(cause); LOG.debug("{}: Transaction {} failed", persistenceId(), getIdentifier(), cause); @@ -343,10 +380,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { checkReady().readyCohort.canCommit(new FutureCallback() { @Override public void onSuccess(final Void result) { - recordAndSendSuccess(envelope, now, new TransactionCanCommitSuccess(getIdentifier(), - envelope.getMessage().getSequence())); - ready.stage = CommitStage.CAN_COMMIT_COMPLETE; - LOG.debug("{}: Transaction {} completed canCommit", persistenceId(), getIdentifier()); + successfulCanCommit(envelope, now); } @Override @@ -365,6 +399,20 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { } } + void successfulCanCommit(final RequestEnvelope envelope, final long startTime) { + if (state instanceof Retired) { + LOG.debug("{}: Suppressing successful canCommit of retired transaction {}", persistenceId(), + getIdentifier()); + return; + } + + final Ready ready = checkReady(); + recordAndSendSuccess(envelope, startTime, new TransactionCanCommitSuccess(getIdentifier(), + envelope.getMessage().getSequence())); + ready.stage = CommitStage.CAN_COMMIT_COMPLETE; + LOG.debug("{}: Transaction {} completed canCommit", persistenceId(), getIdentifier()); + } + private void directCommit(final RequestEnvelope envelope, final long now) throws RequestException { throwIfFailed(); @@ -399,6 +447,11 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { } void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) { + if (state instanceof Retired) { + LOG.debug("{}: Suppressing direct canCommit of retired transaction {}", persistenceId(), getIdentifier()); + return; + } + final Ready ready = checkReady(); ready.stage = CommitStage.PRE_COMMIT_PENDING; LOG.debug("{}: Transaction {} initiating direct preCommit", persistenceId(), getIdentifier()); @@ -416,6 +469,11 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { } void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) { + if (state instanceof Retired) { + LOG.debug("{}: Suppressing direct commit of retired transaction {}", persistenceId(), getIdentifier()); + return; + } + final Ready ready = checkReady(); ready.stage = CommitStage.COMMIT_PENDING; LOG.debug("{}: Transaction {} initiating direct commit", persistenceId(), getIdentifier()); @@ -433,6 +491,11 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { } void successfulCommit(final RequestEnvelope envelope, final long startTime) { + if (state instanceof Retired) { + LOG.debug("{}: Suppressing commit response on retired transaction {}", persistenceId(), getIdentifier()); + return; + } + recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(getIdentifier(), envelope.getMessage().getSequence())); state = COMMITTED; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java index 6c7ae07a3c..fb56bf413a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java @@ -153,6 +153,8 @@ abstract class FrontendTransaction implements Identifiable doHandleRequest(TransactionRequest request, RequestEnvelope envelope, long now) throws RequestException; + abstract void retire(); + private void recordResponse(final long sequence, final Object response) { if (replayQueue.isEmpty()) { firstReplaySequence = sequence; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java index d860bfa289..061e035c66 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java @@ -14,6 +14,7 @@ import com.google.common.collect.RangeSet; import com.google.common.collect.TreeRangeSet; import com.google.common.primitives.UnsignedLong; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; @@ -32,6 +33,7 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException; +import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State; import org.opendaylight.yangtools.concepts.Identifiable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -219,7 +221,25 @@ final class LeaderFrontendState implements Identifiable { } void retire() { - // FIXME: flush all state + // Hunt down any transactions associated with this frontend + final Iterator it = tree.cohortIterator(); + while (it.hasNext()) { + final SimpleShardDataTreeCohort cohort = it.next(); + if (clientId.equals(cohort.getIdentifier().getHistoryId().getClientId())) { + if (cohort.getState() != State.COMMIT_PENDING) { + LOG.debug("{}: Retiring transaction {}", persistenceId, cohort.getIdentifier()); + it.remove(); + } else { + LOG.debug("{}: Transaction {} already committing, not retiring it", persistenceId, + cohort.getIdentifier()); + } + } + } + + // Clear out all transaction chains + localHistories.values().forEach(AbstractFrontendHistory::retire); + localHistories.clear(); + standaloneHistory.retire(); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index df9753e063..318a4e68e7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -181,6 +181,7 @@ public class Shard extends RaftActor { private final FrontendMetadata frontendMetadata; private Map knownFrontends = ImmutableMap.of(); + private boolean paused; protected Shard(final AbstractBuilder builder) { super(builder.getId().toString(), builder.getPeerAddresses(), @@ -452,10 +453,10 @@ public class Shard extends RaftActor { private @Nullable RequestSuccess handleRequest(final RequestEnvelope envelope, final long now) throws RequestException { // We are not the leader, hence we want to fail-fast. - if (!isLeader() || !isLeaderActive()) { - LOG.info("{}: not currently leader, rejecting request {}. isLeader: {}, isLeaderActive: {}," - + "isLeadershipTransferInProgress: {}.", - persistenceId(), envelope, isLeader(), isLeaderActive(), isLeadershipTransferInProgress()); + if (!isLeader() || paused || !isLeaderActive()) { + LOG.debug("{}: not currently active leader, rejecting request {}. isLeader: {}, isLeaderActive: {}," + + "isLeadershipTransferInProgress: {}, paused: {}", + persistenceId(), envelope, isLeader(), isLeaderActive(), isLeadershipTransferInProgress(), paused); throw new NotLeaderException(getSelf()); } @@ -788,6 +789,7 @@ public class Shard extends RaftActor { persistenceId(), getId()); } + paused = false; store.purgeLeaderState(); } @@ -799,19 +801,19 @@ public class Shard extends RaftActor { @Override protected void onLeaderChanged(final String oldLeader, final String newLeader) { shardMBean.incrementLeadershipChangeCount(); + paused = false; - final boolean hasLeader = hasLeader(); - if (!hasLeader) { - // No leader implies we are not the leader, lose frontend state if we have any. This also places - // an explicit guard so the map will not get modified accidentally. + if (!isLeader()) { if (!knownFrontends.isEmpty()) { LOG.debug("{}: removing frontend state for {}", persistenceId(), knownFrontends.keySet()); knownFrontends = ImmutableMap.of(); } - return; - } - if (!isLeader()) { + if (!hasLeader()) { + // No leader anywhere, nothing else to do + return; + } + // Another leader was elected. If we were the previous leader and had pending transactions, convert // them to transaction messages and send to the new leader. ActorSelection leader = getLeader(); @@ -854,13 +856,24 @@ public class Shard extends RaftActor { @Override protected void pauseLeader(final Runnable operation) { LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation); + paused = true; + + // Tell-based protocol can replay transaction state, so it is safe to blow it up when we are paused. + knownFrontends.values().forEach(LeaderFrontendState::retire); + knownFrontends = ImmutableMap.of(); + store.setRunOnPendingTransactionsComplete(operation); } @Override protected void unpauseLeader() { LOG.debug("{}: In unpauseLeader", persistenceId()); + paused = false; + store.setRunOnPendingTransactionsComplete(null); + + // Restore tell-based protocol state as if we were becoming the leader + knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this)); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index b0878e49ad..4ae46fb918 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -1153,4 +1153,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { ShardStats getStats() { return shard.getShardMBean(); } + + Iterator cohortIterator() { + return Iterables.transform(Iterables.concat(pendingFinishCommits, pendingCommits, pendingTransactions), + e -> e.cohort).iterator(); + } + + void removeTransactionChain(final LocalHistoryIdentifier id) { + if (transactionChains.remove(id) != null) { + LOG.debug("{}: Removed transaction chain {}", logContext, id); + } + } }