BUG-8618: add pause/unpause mechanics for tell-based protocol 36/60436/1
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 6 Jul 2017 16:26:44 +0000 (18:26 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Sun, 16 Jul 2017 21:39:00 +0000 (23:39 +0200)
When we are transitioning to/from paused state, we need to remove
all frontend-related state, including pending transactions, to ensure
ShardDataTree does not track them.

When we change to unpaused leader, we can reconstruct the state
from the journal -- the rest will be forwarded from the frontend anyway.

Change-Id: I28d486d1a6695e21dd7e6518609680d54e5a15eb
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 40d27d44d6f0b0358505b2e8ac5abbad25f47d4b)

opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadOnlyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java

index 1aad9e99823540ab9fd53fe9f8c89a82e67e3054..c1d2db309410dd831a31b3ec4845f02534749c9b 100644 (file)
@@ -154,18 +154,23 @@ abstract class AbstractFrontendHistory implements Identifiable<LocalHistoryIdent
         return tx.handleRequest(request, envelope, now);
     }
 
         return tx.handleRequest(request, envelope, now);
     }
 
-    void destroy(final long sequence, final RequestEnvelope envelope, final long now) {
+    final void destroy(final long sequence, final RequestEnvelope envelope, final long now) {
         LOG.debug("{}: closing history {}", persistenceId(), getIdentifier());
         tree.closeTransactionChain(getIdentifier(),
             () -> envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now));
     }
 
         LOG.debug("{}: closing history {}", persistenceId(), getIdentifier());
         tree.closeTransactionChain(getIdentifier(),
             () -> envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now));
     }
 
-    void purge(final long sequence, final RequestEnvelope envelope, final long now) {
+    final void purge(final long sequence, final RequestEnvelope envelope, final long now) {
         LOG.debug("{}: purging history {}", persistenceId(), getIdentifier());
         tree.purgeTransactionChain(getIdentifier(),
             () -> envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now));
     }
 
         LOG.debug("{}: purging history {}", persistenceId(), getIdentifier());
         tree.purgeTransactionChain(getIdentifier(),
             () -> envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now));
     }
 
+    final void retire() {
+        transactions.values().forEach(FrontendTransaction::retire);
+        tree.removeTransactionChain(getIdentifier());
+    }
+
     private FrontendTransaction createTransaction(final TransactionRequest<?> request, final TransactionIdentifier id)
             throws RequestException {
         if (request instanceof CommitLocalTransactionRequest) {
     private FrontendTransaction createTransaction(final TransactionRequest<?> request, final TransactionIdentifier id)
             throws RequestException {
         if (request instanceof CommitLocalTransactionRequest) {
index 284acbab0b1c2ddfd27b371c093202a5bcb23fbc..b245ceee2d249ae0f841a69342412ce1eab8272e 100644 (file)
@@ -66,6 +66,11 @@ final class FrontendReadOnlyTransaction extends FrontendTransaction {
         }
     }
 
         }
     }
 
+    @Override
+    void retire() {
+        // No-op
+    }
+
     private void handleModifyTransaction(final ModifyTransactionRequest request, final RequestEnvelope envelope,
             final long now) {
         // The only valid request here is with abort protocol
     private void handleModifyTransaction(final ModifyTransactionRequest request, final RequestEnvelope envelope,
             final long now) {
         // The only valid request here is with abort protocol
index 6cacb325a03fc441f99496d07552cd58cfe867df..50e913025d6b9a7ab818393a47c61d4a384f202a 100644 (file)
@@ -122,6 +122,22 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
         }
     }
 
         }
     }
 
+    /**
+     * Retired state, needed to catch and suppress callbacks after we have removed associated state.
+     */
+    private static final class Retired extends State {
+        private final String prevStateString;
+
+        Retired(final State prevState) {
+            prevStateString = prevState.toString();
+        }
+
+        @Override
+        public String toString() {
+            return "RETIRED (in " + prevStateString + ")";
+        }
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(FrontendReadWriteTransaction.class);
     private static final State ABORTED = new State() {
         @Override
     private static final Logger LOG = LoggerFactory.getLogger(FrontendReadWriteTransaction.class);
     private static final State ABORTED = new State() {
         @Override
@@ -196,6 +212,11 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
         }
     }
 
         }
     }
 
+    @Override
+    void retire() {
+        state = new Retired(state);
+    }
+
     private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
             final RequestEnvelope envelope, final long now) throws RequestException {
         throwIfFailed();
     private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
             final RequestEnvelope envelope, final long now) throws RequestException {
         throwIfFailed();
@@ -211,10 +232,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
                 ready.readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
                     @Override
                     public void onSuccess(final DataTreeCandidate result) {
                 ready.readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
                     @Override
                     public void onSuccess(final DataTreeCandidate result) {
-                        LOG.debug("{}: Transaction {} completed preCommit", persistenceId(), getIdentifier());
-                        recordAndSendSuccess(envelope, now, new TransactionPreCommitSuccess(getIdentifier(),
-                            request.getSequence()));
-                        ready.stage = CommitStage.PRE_COMMIT_COMPLETE;
+                        successfulPreCommit(envelope, now);
                     }
 
                     @Override
                     }
 
                     @Override
@@ -233,7 +251,26 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
         }
     }
 
         }
     }
 
-    private void failTransaction(final RequestEnvelope envelope, final long now, final RuntimeRequestException cause) {
+    void successfulPreCommit(final RequestEnvelope envelope, final long startTime) {
+        if (state instanceof Retired) {
+            LOG.debug("{}: Suppressing successful preCommit of retired transaction {}", persistenceId(),
+                getIdentifier());
+            return;
+        }
+
+        final Ready ready = checkReady();
+        LOG.debug("{}: Transaction {} completed preCommit", persistenceId(), getIdentifier());
+        recordAndSendSuccess(envelope, startTime, new TransactionPreCommitSuccess(getIdentifier(),
+            envelope.getMessage().getSequence()));
+        ready.stage = CommitStage.PRE_COMMIT_COMPLETE;
+    }
+
+    void failTransaction(final RequestEnvelope envelope, final long now, final RuntimeRequestException cause) {
+        if (state instanceof Retired) {
+            LOG.debug("{}: Suppressing failure of retired transaction {}", persistenceId(), getIdentifier(), cause);
+            return;
+        }
+
         recordAndSendFailure(envelope, now, cause);
         state = new Failed(cause);
         LOG.debug("{}: Transaction {} failed", persistenceId(), getIdentifier(), cause);
         recordAndSendFailure(envelope, now, cause);
         state = new Failed(cause);
         LOG.debug("{}: Transaction {} failed", persistenceId(), getIdentifier(), cause);
@@ -343,10 +380,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
                 checkReady().readyCohort.canCommit(new FutureCallback<Void>() {
                     @Override
                     public void onSuccess(final Void result) {
                 checkReady().readyCohort.canCommit(new FutureCallback<Void>() {
                     @Override
                     public void onSuccess(final Void result) {
-                        recordAndSendSuccess(envelope, now, new TransactionCanCommitSuccess(getIdentifier(),
-                            envelope.getMessage().getSequence()));
-                        ready.stage = CommitStage.CAN_COMMIT_COMPLETE;
-                        LOG.debug("{}: Transaction {} completed canCommit", persistenceId(), getIdentifier());
+                        successfulCanCommit(envelope, now);
                     }
 
                     @Override
                     }
 
                     @Override
@@ -365,6 +399,20 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
         }
     }
 
         }
     }
 
+    void successfulCanCommit(final RequestEnvelope envelope, final long startTime) {
+        if (state instanceof Retired) {
+            LOG.debug("{}: Suppressing successful canCommit of retired transaction {}", persistenceId(),
+                getIdentifier());
+            return;
+        }
+
+        final Ready ready = checkReady();
+        recordAndSendSuccess(envelope, startTime, new TransactionCanCommitSuccess(getIdentifier(),
+            envelope.getMessage().getSequence()));
+        ready.stage = CommitStage.CAN_COMMIT_COMPLETE;
+        LOG.debug("{}: Transaction {} completed canCommit", persistenceId(), getIdentifier());
+    }
+
     private void directCommit(final RequestEnvelope envelope, final long now) throws RequestException {
         throwIfFailed();
 
     private void directCommit(final RequestEnvelope envelope, final long now) throws RequestException {
         throwIfFailed();
 
@@ -399,6 +447,11 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
     }
 
     void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
     }
 
     void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
+        if (state instanceof Retired) {
+            LOG.debug("{}: Suppressing direct canCommit of retired transaction {}", persistenceId(), getIdentifier());
+            return;
+        }
+
         final Ready ready = checkReady();
         ready.stage = CommitStage.PRE_COMMIT_PENDING;
         LOG.debug("{}: Transaction {} initiating direct preCommit", persistenceId(), getIdentifier());
         final Ready ready = checkReady();
         ready.stage = CommitStage.PRE_COMMIT_PENDING;
         LOG.debug("{}: Transaction {} initiating direct preCommit", persistenceId(), getIdentifier());
@@ -416,6 +469,11 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
     }
 
     void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) {
     }
 
     void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) {
+        if (state instanceof Retired) {
+            LOG.debug("{}: Suppressing direct commit of retired transaction {}", persistenceId(), getIdentifier());
+            return;
+        }
+
         final Ready ready = checkReady();
         ready.stage = CommitStage.COMMIT_PENDING;
         LOG.debug("{}: Transaction {} initiating direct commit", persistenceId(), getIdentifier());
         final Ready ready = checkReady();
         ready.stage = CommitStage.COMMIT_PENDING;
         LOG.debug("{}: Transaction {} initiating direct commit", persistenceId(), getIdentifier());
@@ -433,6 +491,11 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
     }
 
     void successfulCommit(final RequestEnvelope envelope, final long startTime) {
     }
 
     void successfulCommit(final RequestEnvelope envelope, final long startTime) {
+        if (state instanceof Retired) {
+            LOG.debug("{}: Suppressing commit response on retired transaction {}", persistenceId(), getIdentifier());
+            return;
+        }
+
         recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(getIdentifier(),
             envelope.getMessage().getSequence()));
         state = COMMITTED;
         recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(getIdentifier(),
             envelope.getMessage().getSequence()));
         state = COMMITTED;
index 94093195c9b753e9f48646ce313de89f046ede58..2a4aeaa49b3d04b30b3859d99e75287e4f308ca5 100644 (file)
@@ -157,6 +157,8 @@ abstract class FrontendTransaction implements Identifiable<TransactionIdentifier
     abstract TransactionSuccess<?> doHandleRequest(TransactionRequest<?> request, RequestEnvelope envelope,
             long now) throws RequestException;
 
     abstract TransactionSuccess<?> doHandleRequest(TransactionRequest<?> request, RequestEnvelope envelope,
             long now) throws RequestException;
 
+    abstract void retire();
+
     private void recordResponse(final long sequence, final Object response) {
         if (replayQueue.isEmpty()) {
             firstReplaySequence = sequence;
     private void recordResponse(final long sequence, final Object response) {
         if (replayQueue.isEmpty()) {
             firstReplaySequence = sequence;
index ba2bdb3c7bfc87ba0dcc479ec754d7ff8dd7a751..5a5e42637e6a5a4908a23dd85df8abd88589967b 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 import java.util.HashMap;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 import java.util.Map;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
@@ -28,6 +29,7 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie
 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
+import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
 import org.opendaylight.controller.cluster.datastore.utils.UnsignedLongRangeSet;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.slf4j.Logger;
 import org.opendaylight.controller.cluster.datastore.utils.UnsignedLongRangeSet;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.slf4j.Logger;
@@ -215,7 +217,25 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
     }
 
     void retire() {
     }
 
     void retire() {
-        // FIXME: flush all state
+        // Hunt down any transactions associated with this frontend
+        final Iterator<SimpleShardDataTreeCohort> it = tree.cohortIterator();
+        while (it.hasNext()) {
+            final SimpleShardDataTreeCohort cohort = it.next();
+            if (clientId.equals(cohort.getIdentifier().getHistoryId().getClientId())) {
+                if (cohort.getState() != State.COMMIT_PENDING) {
+                    LOG.debug("{}: Retiring transaction {}", persistenceId, cohort.getIdentifier());
+                    it.remove();
+                } else {
+                    LOG.debug("{}: Transaction {} already committing, not retiring it", persistenceId,
+                        cohort.getIdentifier());
+                }
+            }
+        }
+
+        // Clear out all transaction chains
+        localHistories.values().forEach(AbstractFrontendHistory::retire);
+        localHistories.clear();
+        standaloneHistory.retire();
     }
 
     @Override
     }
 
     @Override
index 10cef862fce2e42f731ebe01ff6775348c65476a..d9483d7b2b61915295eef094fc8c8570f0b63eaa 100644 (file)
@@ -185,6 +185,7 @@ public class Shard extends RaftActor {
 
     private final FrontendMetadata frontendMetadata;
     private Map<FrontendIdentifier, LeaderFrontendState> knownFrontends = ImmutableMap.of();
 
     private final FrontendMetadata frontendMetadata;
     private Map<FrontendIdentifier, LeaderFrontendState> knownFrontends = ImmutableMap.of();
+    private boolean paused;
 
     private final MessageSlicer responseMessageSlicer;
     private final Dispatchers dispatchers;
 
     private final MessageSlicer responseMessageSlicer;
     private final Dispatchers dispatchers;
@@ -478,10 +479,10 @@ public class Shard extends RaftActor {
     private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
             throws RequestException {
         // We are not the leader, hence we want to fail-fast.
     private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
             throws RequestException {
         // We are not the leader, hence we want to fail-fast.
-        if (!isLeader() || !isLeaderActive()) {
-            LOG.info("{}: not currently leader, rejecting request {}. isLeader: {}, isLeaderActive: {},"
-                            + "isLeadershipTransferInProgress: {}.",
-                    persistenceId(), envelope, isLeader(), isLeaderActive(), isLeadershipTransferInProgress());
+        if (!isLeader() || paused || !isLeaderActive()) {
+            LOG.debug("{}: not currently active leader, rejecting request {}. isLeader: {}, isLeaderActive: {},"
+                            + "isLeadershipTransferInProgress: {}, paused: {}",
+                    persistenceId(), envelope, isLeader(), isLeaderActive(), isLeadershipTransferInProgress(), paused);
             throw new NotLeaderException(getSelf());
         }
 
             throw new NotLeaderException(getSelf());
         }
 
@@ -814,6 +815,7 @@ public class Shard extends RaftActor {
                     persistenceId(), getId());
             }
 
                     persistenceId(), getId());
             }
 
+            paused = false;
             store.purgeLeaderState();
         }
 
             store.purgeLeaderState();
         }
 
@@ -825,19 +827,19 @@ public class Shard extends RaftActor {
     @Override
     protected void onLeaderChanged(final String oldLeader, final String newLeader) {
         shardMBean.incrementLeadershipChangeCount();
     @Override
     protected void onLeaderChanged(final String oldLeader, final String newLeader) {
         shardMBean.incrementLeadershipChangeCount();
+        paused = false;
 
 
-        final boolean hasLeader = hasLeader();
-        if (!hasLeader) {
-            // No leader implies we are not the leader, lose frontend state if we have any. This also places
-            // an explicit guard so the map will not get modified accidentally.
+        if (!isLeader()) {
             if (!knownFrontends.isEmpty()) {
                 LOG.debug("{}: removing frontend state for {}", persistenceId(), knownFrontends.keySet());
                 knownFrontends = ImmutableMap.of();
             }
             if (!knownFrontends.isEmpty()) {
                 LOG.debug("{}: removing frontend state for {}", persistenceId(), knownFrontends.keySet());
                 knownFrontends = ImmutableMap.of();
             }
-            return;
-        }
 
 
-        if (!isLeader()) {
+            if (!hasLeader()) {
+                // No leader anywhere, nothing else to do
+                return;
+            }
+
             // Another leader was elected. If we were the previous leader and had pending transactions, convert
             // them to transaction messages and send to the new leader.
             ActorSelection leader = getLeader();
             // Another leader was elected. If we were the previous leader and had pending transactions, convert
             // them to transaction messages and send to the new leader.
             ActorSelection leader = getLeader();
@@ -880,13 +882,24 @@ public class Shard extends RaftActor {
     @Override
     protected void pauseLeader(final Runnable operation) {
         LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation);
     @Override
     protected void pauseLeader(final Runnable operation) {
         LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation);
+        paused = true;
+
+        // Tell-based protocol can replay transaction state, so it is safe to blow it up when we are paused.
+        knownFrontends.values().forEach(LeaderFrontendState::retire);
+        knownFrontends = ImmutableMap.of();
+
         store.setRunOnPendingTransactionsComplete(operation);
     }
 
     @Override
     protected void unpauseLeader() {
         LOG.debug("{}: In unpauseLeader", persistenceId());
         store.setRunOnPendingTransactionsComplete(operation);
     }
 
     @Override
     protected void unpauseLeader() {
         LOG.debug("{}: In unpauseLeader", persistenceId());
+        paused = false;
+
         store.setRunOnPendingTransactionsComplete(null);
         store.setRunOnPendingTransactionsComplete(null);
+
+        // Restore tell-based protocol state as if we were becoming the leader
+        knownFrontends = Verify.verifyNotNull(frontendMetadata.toLeaderState(this));
     }
 
     @Override
     }
 
     @Override
index 27577b27d8d4f9342a90ed6db72ec2a5ca70c06c..7aecda48db6ceaf7ca3a5dbc5ae266847b6676ca 100644 (file)
@@ -1133,4 +1133,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     ShardStats getStats() {
         return shard.getShardMBean();
     }
     ShardStats getStats() {
         return shard.getShardMBean();
     }
+
+    Iterator<SimpleShardDataTreeCohort> cohortIterator() {
+        return Iterables.transform(Iterables.concat(pendingFinishCommits, pendingCommits, pendingTransactions),
+            e -> e.cohort).iterator();
+    }
+
+    void removeTransactionChain(final LocalHistoryIdentifier id) {
+        if (transactionChains.remove(id) != null) {
+            LOG.debug("{}: Removed transaction chain {}", logContext, id);
+        }
+    }
 }
 }