Bug 6540: Fix journal issues on leader changes
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
index 461dfe506db852afa47a7863cdc93f45e677840c..f13938cdb522a31fc1f4f24a4e8fb6f7d60f73ea 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
@@ -42,10 +43,8 @@ import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.RaftVersions;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
-import org.opendaylight.controller.cluster.raft.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.VotingState;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
@@ -53,13 +52,13 @@ import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
-import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
 import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
@@ -106,6 +105,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setCommitIndex(-1);
         short payloadVersion = (short)5;
         actorContext.setPayloadVersion(payloadVersion);
 
@@ -201,6 +201,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
+        actorContext.setCommitIndex(-1);
+        actorContext.setLastApplied(-1);
 
         // The raft context is initialized with a couple log entries. However the commitIndex
         // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
@@ -584,8 +586,10 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         ByteString bs = toByteString(leadersSnapshot);
         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
-        FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
-        leader.setFollowerSnapshot(FOLLOWER_ID, fts);
+        LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
+                actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
+        fts.setSnapshotBytes(bs);
+        leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
 
         //send first chunk and no InstallSnapshotReply received yet
         fts.getNextChunk();
@@ -596,9 +600,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
 
-        AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
-
-        AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
+        AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
         assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
 
@@ -738,6 +740,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         actorContext.getReplicatedLog().removeFrom(0);
 
         leader = new Leader(actorContext);
+        actorContext.setCurrentBehavior(leader);
 
         // Leader will send an immediate heartbeat - ignore it.
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
@@ -761,7 +764,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
         // installed with a SendInstallSnapshot
-        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true));
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
 
         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
 
@@ -919,8 +922,10 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         ByteString bs = toByteString(leadersSnapshot);
         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
-        FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
-        leader.setFollowerSnapshot(FOLLOWER_ID, fts);
+        LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
+                actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
+        fts.setSnapshotBytes(bs);
+        leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
         while(!fts.isLastChunk(fts.getChunkIndex())) {
             fts.getNextChunk();
             fts.incrementChunkIndex();
@@ -934,12 +939,13 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         assertTrue(raftBehavior instanceof Leader);
 
-        assertEquals(0, leader.followerSnapshotSize());
         assertEquals(1, leader.followerLogSize());
         FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
         assertNotNull(fli);
+        assertNull(fli.getInstallSnapshotState());
         assertEquals(commitIndex, fli.getMatchIndex());
         assertEquals(commitIndex + 1, fli.getNextIndex());
+        assertFalse(leader.hasSnapshot());
     }
 
     @Test
@@ -1129,7 +1135,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         assertEquals(1, installSnapshot.getChunkIndex());
         assertEquals(3, installSnapshot.getTotalChunks());
-        assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
+        assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
+                installSnapshot.getLastChunkHashCode().get().intValue());
 
         int hashCode = Arrays.hashCode(installSnapshot.getData());
 
@@ -1146,19 +1153,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testFollowerToSnapshotLogic() {
-        logStart("testFollowerToSnapshotLogic");
-
-        MockRaftActorContext actorContext = createActorContext();
-
-        actorContext.setConfigParams(new DefaultConfigParamsImpl() {
-            @Override
-            public int getSnapshotChunkSize() {
-                return 50;
-            }
-        });
-
-        leader = new Leader(actorContext);
+    public void testLeaderInstallSnapshotState() {
+        logStart("testLeaderInstallSnapshotState");
 
         Map<String, String> leadersSnapshot = new HashMap<>();
         leadersSnapshot.put("1", "A");
@@ -1168,8 +1164,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         ByteString bs = toByteString(leadersSnapshot);
         byte[] barray = bs.toByteArray();
 
-        FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
-        leader.setFollowerSnapshot(FOLLOWER_ID, fts);
+        LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
+        fts.setSnapshotBytes(bs);
 
         assertEquals(bs.size(), barray.length);
 
@@ -1274,7 +1270,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-        assertEquals(1, appendEntries.getLeaderCommit());
+        assertEquals(-1, appendEntries.getLeaderCommit());
         assertEquals(0, appendEntries.getEntries().size());
         assertEquals(0, appendEntries.getPrevLogIndex());
 
@@ -1329,7 +1325,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         // Initial heartbeat
         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-        assertEquals(1, appendEntries.getLeaderCommit());
+        assertEquals(-1, appendEntries.getLeaderCommit());
         assertEquals(0, appendEntries.getEntries().size());
         assertEquals(0, appendEntries.getPrevLogIndex());
 
@@ -1401,8 +1397,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MessageCollectorActor.clearMessages(followerActor);
         MessageCollectorActor.clearMessages(leaderActor);
 
-        // Verify initial AppendEntries sent with the leader's current commit index.
-        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        // Verify initial AppendEntries sent.
+        assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
 
@@ -1481,7 +1477,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MessageCollectorActor.clearMessages(leaderActor);
 
         // Verify initial AppendEntries sent with the leader's current commit index.
-        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
 
@@ -1562,7 +1558,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MessageCollectorActor.clearMessages(leaderActor);
 
         // Verify initial AppendEntries sent with the leader's current commit index.
-        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
 
@@ -1761,7 +1757,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MessageCollectorActor.clearMessages(leaderActor);
 
         // Verify initial AppendEntries sent with the leader's current commit index.
-        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
 
@@ -1987,6 +1983,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
                 new FiniteDuration(1000, TimeUnit.SECONDS));
 
         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+        leaderActorContext.setCommitIndex(-1);
+        leaderActorContext.setLastApplied(-1);
 
         String nonVotingFollowerId = "nonvoting-follower";
         TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
@@ -2047,6 +2045,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         logStart("testTransferLeadershipWithFollowerInSync");
 
         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        leaderActorContext.setLastApplied(-1);
         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
                 new FiniteDuration(1000, TimeUnit.SECONDS));
         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());