Bug 6587: Retain state when transitioning between Leader and IsolatedLeader 99/44899/4
authorTom Pantelis <tpanteli@brocade.com>
Wed, 31 Aug 2016 04:55:57 +0000 (00:55 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Thu, 1 Sep 2016 02:57:55 +0000 (02:57 +0000)
If there's a transaction in the COMMIT_PENDING state, ie it has been persisted and
is in the process of being replicated, and the Leader switches to IsolatedLeader, the
ClientRequestTracker state is lost. As a result when the follower(s) come back and
replication consensus is achieved and the tx is applied to state, the tx ID isn't
available and the ShardDataTree applies it as a foreign candidate, leaving the
tx in the pending queue. This prevents subsequent transactions from making progress.

To fix this, we need to retain/copy the internal leader state when transitioning
between Leader and IsolatedLeader.

Change-Id: If06996dccf083fd5d37757fd91fde2eb0eb82ea1
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/IsolatedLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java

index befc6d412962a7395608c4ee1981b02014f5986d..b241e0a67a4f81118b1592e25bc1011e336318dc 100644 (file)
@@ -96,7 +96,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private final Queue<ClientRequestTracker> trackers = new LinkedList<>();
 
     private Cancellable heartbeatSchedule = null;
-    private Optional<SnapshotHolder> snapshot;
+    private Optional<SnapshotHolder> snapshot = Optional.absent();;
     private int minReplicationCount;
 
     protected AbstractLeader(RaftActorContext context, RaftState state,
@@ -105,6 +105,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         if(initializeFromLeader != null) {
             followerToLog.putAll(initializeFromLeader.followerToLog);
+            mapFollowerToSnapshot.putAll(initializeFromLeader.mapFollowerToSnapshot);
+            snapshot = initializeFromLeader.snapshot;
+            trackers.addAll(initializeFromLeader.trackers);
         } else {
             for(PeerInfo peerInfo: context.getPeers()) {
                 FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context);
@@ -116,8 +119,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         updateMinReplicaCount();
 
-        snapshot = Optional.absent();
-
         // Immediately schedule a heartbeat
         // Upon election: send initial empty AppendEntries RPCs
         // (heartbeat) to each server; repeat during idle periods to
@@ -299,7 +300,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             LOG.trace("{}: checking Nth index {}", logName(), N);
             for (FollowerLogInformation info : followerToLog.values()) {
                 final PeerInfo peerInfo = context.getPeerInfo(info.getId());
-                if(info.getMatchIndex() >= N && (peerInfo != null && peerInfo.isVoting())) {
+                if(info.getMatchIndex() >= N && peerInfo != null && peerInfo.isVoting()) {
                     replicatedCount++;
                 } else if(LOG.isTraceEnabled()) {
                     LOG.trace("{}: Not counting follower {} - matchIndex: {}, {}", logName(), info.getId(),
@@ -699,9 +700,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         // If the follower's nextIndex is -1 then we might as well send it a snapshot
         // Otherwise send it a snapshot only if the nextIndex is not present in the log but is present
         // in the snapshot
-        return (nextIndex == -1 ||
+        return nextIndex == -1 ||
                 (!context.getReplicatedLog().isPresent(nextIndex)
-                        && context.getReplicatedLog().isInSnapshot(nextIndex)));
+                        && context.getReplicatedLog().isInSnapshot(nextIndex));
 
     }
 
@@ -841,7 +842,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 }
             }
         }
-        return (minPresent != 0);
+        return minPresent != 0;
     }
 
     /**
@@ -863,8 +864,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         public FollowerToSnapshot(ByteString snapshotBytes) {
             this.snapshotBytes = snapshotBytes;
             int size = snapshotBytes.size();
-            totalChunks = ( size / context.getConfigParams().getSnapshotChunkSize()) +
-                ((size % context.getConfigParams().getSnapshotChunkSize()) > 0 ? 1 : 0);
+            totalChunks = (size / context.getConfigParams().getSnapshotChunkSize()) +
+                (size % context.getConfigParams().getSnapshotChunkSize() > 0 ? 1 : 0);
             if(LOG.isDebugEnabled()) {
                 LOG.debug("{}: Snapshot {} bytes, total chunks to send:{}",
                         logName(), size, totalChunks);
index d6a4e9d98e82db7a21f7de8b5c255e60b7301641..375b3779b891dd94e1a3b702f9396853478de265 100644 (file)
@@ -419,13 +419,14 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
     }
 
     protected RaftActorBehavior internalSwitchBehavior(RaftState newState) {
-        if(context.getRaftPolicy().automaticElectionsEnabled()){
-            return internalSwitchBehavior(createBehavior(context, newState));
-        }
-        return this;
+        return internalSwitchBehavior(createBehavior(context, newState));
     }
 
     protected RaftActorBehavior internalSwitchBehavior(RaftActorBehavior newBehavior) {
+        if(!context.getRaftPolicy().automaticElectionsEnabled()) {
+            return this;
+        }
+
         LOG.info("{} :- Switching from behavior {} to {}", logName(), this.state(), newBehavior.state());
         try {
             close();
index c969586722d3a849a7fd51ebf1d02c51463d4db3..a02e40092b3e314ee83bf0ae10e5a70e7106abea 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import akka.actor.ActorRef;
+import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
@@ -26,8 +27,12 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
  *
  */
 public class IsolatedLeader extends AbstractLeader {
+    IsolatedLeader(RaftActorContext context, @Nullable AbstractLeader initializeFromLeader) {
+        super(context, RaftState.IsolatedLeader, initializeFromLeader);
+    }
+
     public IsolatedLeader(RaftActorContext context) {
-        super(context, RaftState.IsolatedLeader);
+        this(context, null);
     }
 
     // we received an Append Entries reply, we should switch the Behavior to Leader
@@ -40,7 +45,7 @@ public class IsolatedLeader extends AbstractLeader {
         // changes its state to Follower, hence we only need to switch to Leader if the state is still Isolated
         if (ret.state() == RaftState.IsolatedLeader && !isLeaderIsolated()) {
             LOG.info("IsolatedLeader {} switching from IsolatedLeader to Leader", getLeaderId());
-            return internalSwitchBehavior(RaftState.Leader);
+            return internalSwitchBehavior(new Leader(context, this));
         }
         return ret;
     }
index 58cf7161062c1eb67568930fb15252aadd21a23e..827364c29faeefd03149b5b65777195ca3afa5d6 100644 (file)
@@ -71,7 +71,7 @@ public class Leader extends AbstractLeader {
             if (isLeaderIsolated()) {
                 LOG.warn("{}: At least {} followers need to be active, Switching {} from Leader to IsolatedLeader",
                     context.getId(), getMinIsolatedLeaderPeerCount(), getLeaderId());
-                return internalSwitchBehavior(RaftState.IsolatedLeader);
+                return internalSwitchBehavior(new IsolatedLeader(context, this));
             } else {
                 return this;
             }
index d6faff694c66123d07576e44916d2d714eb45e2e..4c0aac48346a59e2db49db247cdaa37bbbd2abf0 100644 (file)
@@ -41,16 +41,12 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
-import org.opendaylight.controller.cluster.datastore.IntegrationTestKit.ShardStatsVerifier;
-import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
@@ -58,7 +54,6 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransact
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
@@ -706,19 +701,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         // Wait for the commit to be replicated to the follower.
 
-        MemberNode.verifyRaftState(followerDistributedDataStore, "cars", new RaftStateVerifier() {
-            @Override
-            public void verify(OnDemandRaftState raftState) {
-                assertEquals("getLastApplied", 0, raftState.getLastApplied());
-            }
-        });
+        MemberNode.verifyRaftState(followerDistributedDataStore, "cars",
+                raftState -> assertEquals("getLastApplied", 0, raftState.getLastApplied()));
 
-        MemberNode.verifyRaftState(followerDistributedDataStore, "people", new RaftStateVerifier() {
-            @Override
-            public void verify(OnDemandRaftState raftState) {
-                assertEquals("getLastApplied", 0, raftState.getLastApplied());
-            }
-        });
+        MemberNode.verifyRaftState(followerDistributedDataStore, "people",
+                raftState -> assertEquals("getLastApplied", 0, raftState.getLastApplied()));
 
         // Prepare, ready and canCommit a WO tx that writes to 2 shards. This will become the current tx in
         // the leader shard.
@@ -769,12 +756,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         cars.add(CarsModel.newCarEntry("car" + carIndex, BigInteger.valueOf(carIndex)));
         readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
 
-        IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() {
-            @Override
-            public void verify(ShardStats stats) {
-                assertEquals("getReadWriteTransactionCount", 1, stats.getReadWriteTransactionCount());
-            }
-        });
+        IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
+                stats -> assertEquals("getReadWriteTransactionCount", 1, stats.getReadWriteTransactionCount()));
 
         // Disable elections on the leader so it switches to follower.
 
@@ -828,24 +811,16 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
             writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
             DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
 
-            IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() {
-                @Override
-                public void verify(ShardStats stats) {
-                    assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize());
-                }
-            });
+            IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
+                    stats -> assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize()));
 
             writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
             MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
             writeTx.write(CarsModel.newCarPath("optima"), car);
             DOMStoreThreePhaseCommitCohort cohort2 = writeTx.ready();
 
-            IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars", new ShardStatsVerifier() {
-                @Override
-                public void verify(ShardStats stats) {
-                    assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize());
-                }
-            });
+            IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
+                    stats -> assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize()));
 
             // Gracefully stop the leader via a Shutdown message.
 
@@ -877,29 +852,40 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
     @Test
     public void testTransactionWithIsolatedLeader() throws Throwable {
-        leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(200);
+        // Set the isolated leader check interval high so we can control the switch to IsolatedLeader.
+        leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(10000000);
         String testName = "testTransactionWithIsolatedLeader";
         initDatastoresWithCars(testName);
 
-        DOMStoreWriteTransaction failWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
-        failWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+        // Tx that is submitted after the follower is stopped but before the leader transitions to IsolatedLeader.
+        DOMStoreWriteTransaction preIsolatedLeaderWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
+        preIsolatedLeaderWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
 
+        // Tx that is submitted after the leader transitions to IsolatedLeader.
+        DOMStoreWriteTransaction noShardLeaderWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
+        noShardLeaderWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+
+        // Tx that is submitted after the follower is reinstated.
         DOMStoreWriteTransaction successWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
         successWriteTx.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer());
 
+        // Stop the follower
         followerTestKit.watch(followerDistributedDataStore.getActorContext().getShardManager());
         followerDistributedDataStore.close();
         followerTestKit.expectTerminated(followerDistributedDataStore.getActorContext().getShardManager());
 
-        MemberNode.verifyRaftState(leaderDistributedDataStore, "cars", new RaftStateVerifier() {
-            @Override
-            public void verify(OnDemandRaftState raftState) {
-                assertEquals("getRaftState", "IsolatedLeader", raftState.getRaftState());
-            }
-        });
+        // Submit the preIsolatedLeaderWriteTx so it's pending
+        DOMStoreThreePhaseCommitCohort preIsolatedLeaderTxCohort = preIsolatedLeaderWriteTx.ready();
+
+        // Change the isolated leader check interval low so it changes to IsolatedLeader.
+        sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder.
+                shardIsolatedLeaderCheckIntervalInMillis(200));
+
+        MemberNode.verifyRaftState(leaderDistributedDataStore, "cars",
+                raftState -> assertEquals("getRaftState", "IsolatedLeader", raftState.getRaftState()));
 
         try {
-            leaderTestKit.doCommit(failWriteTx.ready());
+            leaderTestKit.doCommit(noShardLeaderWriteTx.ready());
             fail("Expected NoShardLeaderException");
         } catch (ExecutionException e) {
             assertEquals("getCause", NoShardLeaderException.class, e.getCause().getClass());
@@ -908,12 +894,13 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder.
                 shardElectionTimeoutFactor(100));
 
-        DOMStoreThreePhaseCommitCohort writeTxCohort = successWriteTx.ready();
+        DOMStoreThreePhaseCommitCohort successTxCohort = successWriteTx.ready();
 
         followerDistributedDataStore = followerTestKit.setupDistributedDataStore(testName,
                 MODULE_SHARDS_CARS_ONLY_1_2, false, CARS);
 
-        leaderTestKit.doCommit(writeTxCohort);
+        leaderTestKit.doCommit(preIsolatedLeaderTxCohort);
+        leaderTestKit.doCommit(successTxCohort);
     }
 
     @Test(expected=AskTimeoutException.class)
@@ -1017,12 +1004,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
     private static void sendDatastoreContextUpdate(DistributedDataStore dataStore, final Builder builder) {
         final Builder newBuilder = DatastoreContext.newBuilderFrom(builder.build());
         DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
-        Answer<DatastoreContext> answer = new Answer<DatastoreContext>() {
-            @Override
-            public DatastoreContext answer(InvocationOnMock invocation) {
-                return newBuilder.build();
-            }
-        };
+        Answer<DatastoreContext> answer = invocation -> newBuilder.build();
         Mockito.doAnswer(answer).when(mockContextFactory).getBaseDatastoreContext();
         Mockito.doAnswer(answer).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
         dataStore.onDatastoreContextUpdated(mockContextFactory);