BUG-8618: add pause/unpause mechanics for tell-based protocol 33/60033/9
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 6 Jul 2017 16:26:44 +0000 (18:26 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Sat, 15 Jul 2017 01:35:30 +0000 (03:35 +0200)
When we are transitioning to/from paused state, we need to remove
all frontend-related state, including pending transactions, to ensure
ShardDataTree does not track them.

When we change to unpaused leader, we can reconstruct the state
from the journal -- the rest will be forwarded from the frontend anyway.

Change-Id: I28d486d1a6695e21dd7e6518609680d54e5a15eb
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadOnlyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java

index 8563c3e913a87ea1de742b9a0b76b7ce3a1847e9..47f739330de4761ed55c87d7adaddcb0db99b8b4 100644 (file)
@@ -154,20 +154,25 @@ abstract class AbstractFrontendHistory implements Identifiable<LocalHistoryIdent
         return tx.handleRequest(request, envelope, now);
     }
 
-    void destroy(final long sequence, final RequestEnvelope envelope, final long now) {
+    final void destroy(final long sequence, final RequestEnvelope envelope, final long now) {
         LOG.debug("{}: closing history {}", persistenceId(), getIdentifier());
         tree.closeTransactionChain(getIdentifier(), () -> {
             envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now);
         });
     }
 
-    void purge(final long sequence, final RequestEnvelope envelope, final long now) {
+    final void purge(final long sequence, final RequestEnvelope envelope, final long now) {
         LOG.debug("{}: purging history {}", persistenceId(), getIdentifier());
         tree.purgeTransactionChain(getIdentifier(), () -> {
             envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now);
         });
     }
 
+    final void retire() {
+        transactions.values().forEach(FrontendTransaction::retire);
+        tree.removeTransactionChain(getIdentifier());
+    }
+
     private FrontendTransaction createTransaction(final TransactionRequest<?> request, final TransactionIdentifier id)
             throws RequestException {
         if (request instanceof CommitLocalTransactionRequest) {
index 284acbab0b1c2ddfd27b371c093202a5bcb23fbc..b245ceee2d249ae0f841a69342412ce1eab8272e 100644 (file)
@@ -66,6 +66,11 @@ final class FrontendReadOnlyTransaction extends FrontendTransaction {
         }
     }
 
+    @Override
+    void retire() {
+        // No-op
+    }
+
     private void handleModifyTransaction(final ModifyTransactionRequest request, final RequestEnvelope envelope,
             final long now) {
         // The only valid request here is with abort protocol
index 6cacb325a03fc441f99496d07552cd58cfe867df..50e913025d6b9a7ab818393a47c61d4a384f202a 100644 (file)
@@ -122,6 +122,22 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
         }
     }
 
+    /**
+     * Retired state, needed to catch and suppress callbacks after we have removed associated state.
+     */
+    private static final class Retired extends State {
+        private final String prevStateString;
+
+        Retired(final State prevState) {
+            prevStateString = prevState.toString();
+        }
+
+        @Override
+        public String toString() {
+            return "RETIRED (in " + prevStateString + ")";
+        }
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(FrontendReadWriteTransaction.class);
     private static final State ABORTED = new State() {
         @Override
@@ -196,6 +212,11 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
         }
     }
 
+    @Override
+    void retire() {
+        state = new Retired(state);
+    }
+
     private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
             final RequestEnvelope envelope, final long now) throws RequestException {
         throwIfFailed();
@@ -211,10 +232,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
                 ready.readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
                     @Override
                     public void onSuccess(final DataTreeCandidate result) {
-                        LOG.debug("{}: Transaction {} completed preCommit", persistenceId(), getIdentifier());
-                        recordAndSendSuccess(envelope, now, new TransactionPreCommitSuccess(getIdentifier(),
-                            request.getSequence()));
-                        ready.stage = CommitStage.PRE_COMMIT_COMPLETE;
+                        successfulPreCommit(envelope, now);
                     }
 
                     @Override
@@ -233,7 +251,26 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
         }
     }
 
-    private void failTransaction(final RequestEnvelope envelope, final long now, final RuntimeRequestException cause) {
+    void successfulPreCommit(final RequestEnvelope envelope, final long startTime) {
+        if (state instanceof Retired) {
+            LOG.debug("{}: Suppressing successful preCommit of retired transaction {}", persistenceId(),
+                getIdentifier());
+            return;
+        }
+
+        final Ready ready = checkReady();
+        LOG.debug("{}: Transaction {} completed preCommit", persistenceId(), getIdentifier());
+        recordAndSendSuccess(envelope, startTime, new TransactionPreCommitSuccess(getIdentifier(),
+            envelope.getMessage().getSequence()));
+        ready.stage = CommitStage.PRE_COMMIT_COMPLETE;
+    }
+
+    void failTransaction(final RequestEnvelope envelope, final long now, final RuntimeRequestException cause) {
+        if (state instanceof Retired) {
+            LOG.debug("{}: Suppressing failure of retired transaction {}", persistenceId(), getIdentifier(), cause);
+            return;
+        }
+
         recordAndSendFailure(envelope, now, cause);
         state = new Failed(cause);
         LOG.debug("{}: Transaction {} failed", persistenceId(), getIdentifier(), cause);
@@ -343,10 +380,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
                 checkReady().readyCohort.canCommit(new FutureCallback<Void>() {
                     @Override
                     public void onSuccess(final Void result) {
-                        recordAndSendSuccess(envelope, now, new TransactionCanCommitSuccess(getIdentifier(),
-                            envelope.getMessage().getSequence()));
-                        ready.stage = CommitStage.CAN_COMMIT_COMPLETE;
-                        LOG.debug("{}: Transaction {} completed canCommit", persistenceId(), getIdentifier());
+                        successfulCanCommit(envelope, now);
                     }
 
                     @Override
@@ -365,6 +399,20 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
         }
     }
 
+    void successfulCanCommit(final RequestEnvelope envelope, final long startTime) {
+        if (state instanceof Retired) {
+            LOG.debug("{}: Suppressing successful canCommit of retired transaction {}", persistenceId(),
+                getIdentifier());
+            return;
+        }
+
+        final Ready ready = checkReady();
+        recordAndSendSuccess(envelope, startTime, new TransactionCanCommitSuccess(getIdentifier(),
+            envelope.getMessage().getSequence()));
+        ready.stage = CommitStage.CAN_COMMIT_COMPLETE;
+        LOG.debug("{}: Transaction {} completed canCommit", persistenceId(), getIdentifier());
+    }
+
     private void directCommit(final RequestEnvelope envelope, final long now) throws RequestException {
         throwIfFailed();
 
@@ -399,6 +447,11 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
     }
 
     void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
+        if (state instanceof Retired) {
+            LOG.debug("{}: Suppressing direct canCommit of retired transaction {}", persistenceId(), getIdentifier());
+            return;
+        }
+
         final Ready ready = checkReady();
         ready.stage = CommitStage.PRE_COMMIT_PENDING;
         LOG.debug("{}: Transaction {} initiating direct preCommit", persistenceId(), getIdentifier());
@@ -416,6 +469,11 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
     }
 
     void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) {
+        if (state instanceof Retired) {
+            LOG.debug("{}: Suppressing direct commit of retired transaction {}", persistenceId(), getIdentifier());
+            return;
+        }
+
         final Ready ready = checkReady();
         ready.stage = CommitStage.COMMIT_PENDING;
         LOG.debug("{}: Transaction {} initiating direct commit", persistenceId(), getIdentifier());
@@ -433,6 +491,11 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
     }
 
     void successfulCommit(final RequestEnvelope envelope, final long startTime) {
+        if (state instanceof Retired) {
+            LOG.debug("{}: Suppressing commit response on retired transaction {}", persistenceId(), getIdentifier());
+            return;
+        }
+
         recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(getIdentifier(),
             envelope.getMessage().getSequence()));
         state = COMMITTED;
index 6c7ae07a3cf890c8cba9db65eba9e3fec349a405..fb56bf413a9b528cc706388279adc8761a21fee3 100644 (file)
@@ -153,6 +153,8 @@ abstract class FrontendTransaction implements Identifiable<TransactionIdentifier
     abstract @Nullable TransactionSuccess<?> doHandleRequest(TransactionRequest<?> request, RequestEnvelope envelope,
             long now) throws RequestException;
 
+    abstract void retire();
+
     private void recordResponse(final long sequence, final Object response) {
         if (replayQueue.isEmpty()) {
             firstReplaySequence = sequence;
index d860bfa289e39f7b9a144aa652a5a030ec4c4546..061e035c6647c883c84916005e0c4b81b512820f 100644 (file)
@@ -14,6 +14,7 @@ import com.google.common.collect.RangeSet;
 import com.google.common.collect.TreeRangeSet;
 import com.google.common.primitives.UnsignedLong;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
@@ -32,6 +33,7 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie
 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
+import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -219,7 +221,25 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
     }
 
     void retire() {
-        // FIXME: flush all state
+        // Hunt down any transactions associated with this frontend
+        final Iterator<SimpleShardDataTreeCohort> it = tree.cohortIterator();
+        while (it.hasNext()) {
+            final SimpleShardDataTreeCohort cohort = it.next();
+            if (clientId.equals(cohort.getIdentifier().getHistoryId().getClientId())) {
+                if (cohort.getState() != State.COMMIT_PENDING) {
+                    LOG.debug("{}: Retiring transaction {}", persistenceId, cohort.getIdentifier());
+                    it.remove();
+                } else {
+                    LOG.debug("{}: Transaction {} already committing, not retiring it", persistenceId,
+                        cohort.getIdentifier());
+                }
+            }
+        }
+
+        // Clear out all transaction chains
+        localHistories.values().forEach(AbstractFrontendHistory::retire);
+        localHistories.clear();
+        standaloneHistory.retire();
     }
 
     @Override
index df9753e063b1c18a619ccd08b8bbfcf8d3b9c554..318a4e68e793e01c01cc0316099f8134d2aebec1 100644 (file)
@@ -181,6 +181,7 @@ public class Shard extends RaftActor {
 
     private final FrontendMetadata frontendMetadata;
     private Map<FrontendIdentifier, LeaderFrontendState> knownFrontends = ImmutableMap.of();
+    private boolean paused;
 
     protected Shard(final AbstractBuilder<?, ?> builder) {
         super(builder.getId().toString(), builder.getPeerAddresses(),
@@ -452,10 +453,10 @@ public class Shard extends RaftActor {
     private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
             throws RequestException {
         // We are not the leader, hence we want to fail-fast.
-        if (!isLeader() || !isLeaderActive()) {
-            LOG.info("{}: not currently leader, rejecting request {}. isLeader: {}, isLeaderActive: {},"
-                            + "isLeadershipTransferInProgress: {}.",
-                    persistenceId(), envelope, isLeader(), isLeaderActive(), isLeadershipTransferInProgress());
+        if (!isLeader() || paused || !isLeaderActive()) {
+            LOG.debug("{}: not currently active leader, rejecting request {}. isLeader: {}, isLeaderActive: {},"
+                            + "isLeadershipTransferInProgress: {}, paused: {}",
+                    persistenceId(), envelope, isLeader(), isLeaderActive(), isLeadershipTransferInProgress(), paused);
             throw new NotLeaderException(getSelf());
         }
 
@@ -788,6 +789,7 @@ public class Shard extends RaftActor {
                     persistenceId(), getId());
             }
 
+            paused = false;
             store.purgeLeaderState();
         }
 
@@ -799,19 +801,19 @@ public class Shard extends RaftActor {
     @Override
     protected void onLeaderChanged(final String oldLeader, final String newLeader) {
         shardMBean.incrementLeadershipChangeCount();
+        paused = false;
 
-        final boolean hasLeader = hasLeader();
-        if (!hasLeader) {
-            // No leader implies we are not the leader, lose frontend state if we have any. This also places
-            // an explicit guard so the map will not get modified accidentally.
+        if (!isLeader()) {
             if (!knownFrontends.isEmpty()) {
                 LOG.debug("{}: removing frontend state for {}", persistenceId(), knownFrontends.keySet());
                 knownFrontends = ImmutableMap.of();
             }
-            return;
-        }
 
-        if (!isLeader()) {
+            if (!hasLeader()) {
+                // No leader anywhere, nothing else to do
+                return;
+            }
+
             // Another leader was elected. If we were the previous leader and had pending transactions, convert
             // them to transaction messages and send to the new leader.
             ActorSelection leader = getLeader();
@@ -854,13 +856,24 @@ public class Shard extends RaftActor {
     @Override
     protected void pauseLeader(final Runnable operation) {
         LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation);
+        paused = true;
+
+        // Tell-based protocol can replay transaction state, so it is safe to blow it up when we are paused.
+        knownFrontends.values().forEach(LeaderFrontendState::retire);
+        knownFrontends = ImmutableMap.of();
+
         store.setRunOnPendingTransactionsComplete(operation);
     }
 
     @Override
     protected void unpauseLeader() {
         LOG.debug("{}: In unpauseLeader", persistenceId());
+        paused = false;
+
         store.setRunOnPendingTransactionsComplete(null);
+
+        // Restore tell-based protocol state as if we were becoming the leader
+        knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this));
     }
 
     @Override
index b0878e49ada98ef87fc5b4a6d8d0efbf1c468e24..4ae46fb9186b9a35af76b32776664119011c35c6 100644 (file)
@@ -1153,4 +1153,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     ShardStats getStats() {
         return shard.getShardMBean();
     }
+
+    Iterator<SimpleShardDataTreeCohort> cohortIterator() {
+        return Iterables.transform(Iterables.concat(pendingFinishCommits, pendingCommits, pendingTransactions),
+            e -> e.cohort).iterator();
+    }
+
+    void removeTransactionChain(final LocalHistoryIdentifier id) {
+        if (transactionChains.remove(id) != null) {
+            LOG.debug("{}: Removed transaction chain {}", logContext, id);
+        }
+    }
 }