From 64bc1360aedb83583edb354444ee3e4295c7a5e6 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Thu, 6 Jul 2017 18:26:44 +0200 Subject: [PATCH] BUG-8618: add pause/unpause mechanics for tell-based protocol When we are transitioning to/from paused state, we need to remove all frontend-related state, including pending transactions, to ensure ShardDataTree does not track them. When we change to unpaused leader, we can reconstruct the state from the journal -- the rest will be forwarded from the frontend anyway. Change-Id: I28d486d1a6695e21dd7e6518609680d54e5a15eb Signed-off-by: Robert Varga (cherry picked from commit 40d27d44d6f0b0358505b2e8ac5abbad25f47d4b) --- .../datastore/AbstractFrontendHistory.java | 9 ++- .../FrontendReadOnlyTransaction.java | 5 ++ .../FrontendReadWriteTransaction.java | 81 ++++++++++++++++--- .../datastore/FrontendTransaction.java | 2 + .../datastore/LeaderFrontendState.java | 22 ++++- .../controller/cluster/datastore/Shard.java | 35 +++++--- .../cluster/datastore/ShardDataTree.java | 11 +++ 7 files changed, 142 insertions(+), 23 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java index 1aad9e9982..c1d2db3094 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java @@ -154,18 +154,23 @@ abstract class AbstractFrontendHistory implements Identifiable envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now)); } - void purge(final long sequence, final RequestEnvelope envelope, final long now) { + final void purge(final long sequence, final RequestEnvelope envelope, final long now) { LOG.debug("{}: purging history {}", persistenceId(), getIdentifier()); tree.purgeTransactionChain(getIdentifier(), () -> envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now)); } + final void retire() { + transactions.values().forEach(FrontendTransaction::retire); + tree.removeTransactionChain(getIdentifier()); + } + private FrontendTransaction createTransaction(final TransactionRequest request, final TransactionIdentifier id) throws RequestException { if (request instanceof CommitLocalTransactionRequest) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadOnlyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadOnlyTransaction.java index 284acbab0b..b245ceee2d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadOnlyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadOnlyTransaction.java @@ -66,6 +66,11 @@ final class FrontendReadOnlyTransaction extends FrontendTransaction { } } + @Override + void retire() { + // No-op + } + private void handleModifyTransaction(final ModifyTransactionRequest request, final RequestEnvelope envelope, final long now) { // The only valid request here is with abort protocol diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java index 6cacb325a0..50e913025d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java @@ -122,6 +122,22 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { } } + /** + * Retired state, needed to catch and suppress callbacks after we have removed associated state. + */ + private static final class Retired extends State { + private final String prevStateString; + + Retired(final State prevState) { + prevStateString = prevState.toString(); + } + + @Override + public String toString() { + return "RETIRED (in " + prevStateString + ")"; + } + } + private static final Logger LOG = LoggerFactory.getLogger(FrontendReadWriteTransaction.class); private static final State ABORTED = new State() { @Override @@ -196,6 +212,11 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { } } + @Override + void retire() { + state = new Retired(state); + } + private void handleTransactionPreCommit(final TransactionPreCommitRequest request, final RequestEnvelope envelope, final long now) throws RequestException { throwIfFailed(); @@ -211,10 +232,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { ready.readyCohort.preCommit(new FutureCallback() { @Override public void onSuccess(final DataTreeCandidate result) { - LOG.debug("{}: Transaction {} completed preCommit", persistenceId(), getIdentifier()); - recordAndSendSuccess(envelope, now, new TransactionPreCommitSuccess(getIdentifier(), - request.getSequence())); - ready.stage = CommitStage.PRE_COMMIT_COMPLETE; + successfulPreCommit(envelope, now); } @Override @@ -233,7 +251,26 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { } } - private void failTransaction(final RequestEnvelope envelope, final long now, final RuntimeRequestException cause) { + void successfulPreCommit(final RequestEnvelope envelope, final long startTime) { + if (state instanceof Retired) { + LOG.debug("{}: Suppressing successful preCommit of retired transaction {}", persistenceId(), + getIdentifier()); + return; + } + + final Ready ready = checkReady(); + LOG.debug("{}: Transaction {} completed preCommit", persistenceId(), getIdentifier()); + recordAndSendSuccess(envelope, startTime, new TransactionPreCommitSuccess(getIdentifier(), + envelope.getMessage().getSequence())); + ready.stage = CommitStage.PRE_COMMIT_COMPLETE; + } + + void failTransaction(final RequestEnvelope envelope, final long now, final RuntimeRequestException cause) { + if (state instanceof Retired) { + LOG.debug("{}: Suppressing failure of retired transaction {}", persistenceId(), getIdentifier(), cause); + return; + } + recordAndSendFailure(envelope, now, cause); state = new Failed(cause); LOG.debug("{}: Transaction {} failed", persistenceId(), getIdentifier(), cause); @@ -343,10 +380,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { checkReady().readyCohort.canCommit(new FutureCallback() { @Override public void onSuccess(final Void result) { - recordAndSendSuccess(envelope, now, new TransactionCanCommitSuccess(getIdentifier(), - envelope.getMessage().getSequence())); - ready.stage = CommitStage.CAN_COMMIT_COMPLETE; - LOG.debug("{}: Transaction {} completed canCommit", persistenceId(), getIdentifier()); + successfulCanCommit(envelope, now); } @Override @@ -365,6 +399,20 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { } } + void successfulCanCommit(final RequestEnvelope envelope, final long startTime) { + if (state instanceof Retired) { + LOG.debug("{}: Suppressing successful canCommit of retired transaction {}", persistenceId(), + getIdentifier()); + return; + } + + final Ready ready = checkReady(); + recordAndSendSuccess(envelope, startTime, new TransactionCanCommitSuccess(getIdentifier(), + envelope.getMessage().getSequence())); + ready.stage = CommitStage.CAN_COMMIT_COMPLETE; + LOG.debug("{}: Transaction {} completed canCommit", persistenceId(), getIdentifier()); + } + private void directCommit(final RequestEnvelope envelope, final long now) throws RequestException { throwIfFailed(); @@ -399,6 +447,11 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { } void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) { + if (state instanceof Retired) { + LOG.debug("{}: Suppressing direct canCommit of retired transaction {}", persistenceId(), getIdentifier()); + return; + } + final Ready ready = checkReady(); ready.stage = CommitStage.PRE_COMMIT_PENDING; LOG.debug("{}: Transaction {} initiating direct preCommit", persistenceId(), getIdentifier()); @@ -416,6 +469,11 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { } void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) { + if (state instanceof Retired) { + LOG.debug("{}: Suppressing direct commit of retired transaction {}", persistenceId(), getIdentifier()); + return; + } + final Ready ready = checkReady(); ready.stage = CommitStage.COMMIT_PENDING; LOG.debug("{}: Transaction {} initiating direct commit", persistenceId(), getIdentifier()); @@ -433,6 +491,11 @@ final class FrontendReadWriteTransaction extends FrontendTransaction { } void successfulCommit(final RequestEnvelope envelope, final long startTime) { + if (state instanceof Retired) { + LOG.debug("{}: Suppressing commit response on retired transaction {}", persistenceId(), getIdentifier()); + return; + } + recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(getIdentifier(), envelope.getMessage().getSequence())); state = COMMITTED; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java index 94093195c9..2a4aeaa49b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java @@ -157,6 +157,8 @@ abstract class FrontendTransaction implements Identifiable doHandleRequest(TransactionRequest request, RequestEnvelope envelope, long now) throws RequestException; + abstract void retire(); + private void recordResponse(final long sequence, final Object response) { if (replayQueue.isEmpty()) { firstReplaySequence = sequence; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java index ba2bdb3c7b..5a5e42637e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; @@ -28,6 +29,7 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException; +import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State; import org.opendaylight.controller.cluster.datastore.utils.UnsignedLongRangeSet; import org.opendaylight.yangtools.concepts.Identifiable; import org.slf4j.Logger; @@ -215,7 +217,25 @@ final class LeaderFrontendState implements Identifiable { } void retire() { - // FIXME: flush all state + // Hunt down any transactions associated with this frontend + final Iterator it = tree.cohortIterator(); + while (it.hasNext()) { + final SimpleShardDataTreeCohort cohort = it.next(); + if (clientId.equals(cohort.getIdentifier().getHistoryId().getClientId())) { + if (cohort.getState() != State.COMMIT_PENDING) { + LOG.debug("{}: Retiring transaction {}", persistenceId, cohort.getIdentifier()); + it.remove(); + } else { + LOG.debug("{}: Transaction {} already committing, not retiring it", persistenceId, + cohort.getIdentifier()); + } + } + } + + // Clear out all transaction chains + localHistories.values().forEach(AbstractFrontendHistory::retire); + localHistories.clear(); + standaloneHistory.retire(); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 10cef862fc..d9483d7b2b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -185,6 +185,7 @@ public class Shard extends RaftActor { private final FrontendMetadata frontendMetadata; private Map knownFrontends = ImmutableMap.of(); + private boolean paused; private final MessageSlicer responseMessageSlicer; private final Dispatchers dispatchers; @@ -478,10 +479,10 @@ public class Shard extends RaftActor { private @Nullable RequestSuccess handleRequest(final RequestEnvelope envelope, final long now) throws RequestException { // We are not the leader, hence we want to fail-fast. - if (!isLeader() || !isLeaderActive()) { - LOG.info("{}: not currently leader, rejecting request {}. isLeader: {}, isLeaderActive: {}," - + "isLeadershipTransferInProgress: {}.", - persistenceId(), envelope, isLeader(), isLeaderActive(), isLeadershipTransferInProgress()); + if (!isLeader() || paused || !isLeaderActive()) { + LOG.debug("{}: not currently active leader, rejecting request {}. isLeader: {}, isLeaderActive: {}," + + "isLeadershipTransferInProgress: {}, paused: {}", + persistenceId(), envelope, isLeader(), isLeaderActive(), isLeadershipTransferInProgress(), paused); throw new NotLeaderException(getSelf()); } @@ -814,6 +815,7 @@ public class Shard extends RaftActor { persistenceId(), getId()); } + paused = false; store.purgeLeaderState(); } @@ -825,19 +827,19 @@ public class Shard extends RaftActor { @Override protected void onLeaderChanged(final String oldLeader, final String newLeader) { shardMBean.incrementLeadershipChangeCount(); + paused = false; - final boolean hasLeader = hasLeader(); - if (!hasLeader) { - // No leader implies we are not the leader, lose frontend state if we have any. This also places - // an explicit guard so the map will not get modified accidentally. + if (!isLeader()) { if (!knownFrontends.isEmpty()) { LOG.debug("{}: removing frontend state for {}", persistenceId(), knownFrontends.keySet()); knownFrontends = ImmutableMap.of(); } - return; - } - if (!isLeader()) { + if (!hasLeader()) { + // No leader anywhere, nothing else to do + return; + } + // Another leader was elected. If we were the previous leader and had pending transactions, convert // them to transaction messages and send to the new leader. ActorSelection leader = getLeader(); @@ -880,13 +882,24 @@ public class Shard extends RaftActor { @Override protected void pauseLeader(final Runnable operation) { LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation); + paused = true; + + // Tell-based protocol can replay transaction state, so it is safe to blow it up when we are paused. + knownFrontends.values().forEach(LeaderFrontendState::retire); + knownFrontends = ImmutableMap.of(); + store.setRunOnPendingTransactionsComplete(operation); } @Override protected void unpauseLeader() { LOG.debug("{}: In unpauseLeader", persistenceId()); + paused = false; + store.setRunOnPendingTransactionsComplete(null); + + // Restore tell-based protocol state as if we were becoming the leader + knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this)); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 27577b27d8..7aecda48db 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -1133,4 +1133,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { ShardStats getStats() { return shard.getShardMBean(); } + + Iterator cohortIterator() { + return Iterables.transform(Iterables.concat(pendingFinishCommits, pendingCommits, pendingTransactions), + e -> e.cohort).iterator(); + } + + void removeTransactionChain(final LocalHistoryIdentifier id) { + if (transactionChains.remove(id) != null) { + LOG.debug("{}: Removed transaction chain {}", logContext, id); + } + } } -- 2.36.6