Add raftVersion field to AppendEntriesReply
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / behaviors / LeaderTest.java
index 6664600073f2c727936ece4da93f6cd5b76bb290..8529e1926b31ebb88548653594ec867af202b6ea 100644 (file)
@@ -11,6 +11,9 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
@@ -32,7 +35,9 @@ import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
 import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
@@ -41,6 +46,7 @@ import org.opendaylight.controller.cluster.raft.VotingState;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
@@ -140,9 +146,13 @@ public class LeaderTest extends AbstractLeaderTest {
 
 
     private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
+        return sendReplicate(actorContext, 1, index);
+    }
+
+    private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index){
         MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
         MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
-                1, index, payload);
+                term, index, payload);
         actorContext.getReplicatedLog().append(newEntry);
         return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
     }
@@ -184,6 +194,58 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
     }
 
+    @Test
+    public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
+        logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+
+        // The raft context is initialized with a couple log entries. However the commitIndex
+        // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
+        // committed and applied. Now it regains leadership with a higher term (2).
+        long prevTerm = actorContext.getTermInformation().getCurrentTerm();
+        long newTerm = prevTerm + 1;
+        actorContext.getTermInformation().update(newTerm, "");
+
+        leader = new Leader(actorContext);
+
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // The follower replies with the leader's current last index and term, simulating that it is
+        // up to date with the leader.
+        long lastIndex = actorContext.getReplicatedLog().lastIndex();
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
+
+        // The commit index should not get updated even though consensus was reached. This is b/c the
+        // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries
+        // from previous terms by counting replicas".
+        assertEquals("Commit Index", -1, actorContext.getCommitIndex());
+
+        followerActor.underlyingActor().clear();
+
+        // Now replicate a new entry with the new term 2.
+        long newIndex = lastIndex + 1;
+        sendReplicate(actorContext, newTerm, newIndex);
+
+        AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
+        assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
+        assertEquals("Entries size", 1, appendEntries.getEntries().size());
+        assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
+        assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
+        assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
+
+        // The follower replies with success. The leader should now update the commit index to the new index
+        // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all
+        // prior entries are committed indirectly".
+        leader.handleMessage(followerActor, new AppendEntriesReply(
+                FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
+
+        assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
+    }
+
     @Test
     public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
         logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
@@ -1597,7 +1659,10 @@ public class LeaderTest extends AbstractLeaderTest {
 
         leader = new Leader(leaderActorContext);
 
+        FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+
         assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
+        assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
 
         short payloadVersion = 5;
         AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
@@ -1624,8 +1689,10 @@ public class LeaderTest extends AbstractLeaderTest {
 
         assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
 
-        FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+        assertEquals(2, followerInfo.getMatchIndex());
+        assertEquals(3, followerInfo.getNextIndex());
         assertEquals(payloadVersion, followerInfo.getPayloadVersion());
+        assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
     }
 
     @Test
@@ -1938,11 +2005,157 @@ public class LeaderTest extends AbstractLeaderTest {
         MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
 
         // Send reply from the voting follower and verify consensus.
-        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
 
         MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
     }
 
+    @Test
+    public void testTransferLeadershipWithFollowerInSync() {
+        logStart("testTransferLeadershipWithFollowerInSync");
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(1000, TimeUnit.SECONDS));
+        leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+        leader = new Leader(leaderActorContext);
+
+        // Initial heartbeat
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+        MessageCollectorActor.clearMessages(followerActor);
+
+        sendReplicate(leaderActorContext, 0);
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
+        MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
+        MessageCollectorActor.clearMessages(followerActor);
+
+        RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+        leader.transferLeadership(mockTransferCohort);
+
+        verify(mockTransferCohort, never()).transferComplete();
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
+
+        // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
+        MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
+
+        // Leader should force an election timeout
+        MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+
+        verify(mockTransferCohort).transferComplete();
+    }
+
+    @Test
+    public void testTransferLeadershipWithEmptyLog() {
+        logStart("testTransferLeadershipWithEmptyLog");
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(1000, TimeUnit.SECONDS));
+        leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+        leader = new Leader(leaderActorContext);
+
+        // Initial heartbeat
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+        MessageCollectorActor.clearMessages(followerActor);
+
+        RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+        leader.transferLeadership(mockTransferCohort);
+
+        verify(mockTransferCohort, never()).transferComplete();
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+
+        // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // Leader should force an election timeout
+        MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+
+        verify(mockTransferCohort).transferComplete();
+    }
+
+    @Test
+    public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
+        logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(200, TimeUnit.MILLISECONDS));
+
+        leader = new Leader(leaderActorContext);
+
+        // Initial heartbeat
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        MessageCollectorActor.clearMessages(followerActor);
+
+        RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+        leader.transferLeadership(mockTransferCohort);
+
+        verify(mockTransferCohort, never()).transferComplete();
+
+        // Sync up the follower.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+        MessageCollectorActor.clearMessages(followerActor);
+
+        Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
+                getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
+        leader.handleMessage(leaderActor, new SendHeartBeat());
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
+
+        // Leader should force an election timeout
+        MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+
+        verify(mockTransferCohort).transferComplete();
+    }
+
+    @Test
+    public void testTransferLeadershipWithFollowerSyncTimeout() {
+        logStart("testTransferLeadershipWithFollowerSyncTimeout");
+
+        MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+                new FiniteDuration(200, TimeUnit.MILLISECONDS));
+        ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
+        leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+        leader = new Leader(leaderActorContext);
+
+        // Initial heartbeat
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+        MessageCollectorActor.clearMessages(followerActor);
+
+        sendReplicate(leaderActorContext, 0);
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        MessageCollectorActor.clearMessages(followerActor);
+
+        RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+        leader.transferLeadership(mockTransferCohort);
+
+        verify(mockTransferCohort, never()).transferComplete();
+
+        // Send heartbeats to time out the transfer.
+        for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
+            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
+                    getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
+            leader.handleMessage(leaderActor, new SendHeartBeat());
+        }
+
+        verify(mockTransferCohort).abortTransfer();
+        verify(mockTransferCohort, never()).transferComplete();
+        MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
+    }
+
     @Override
     protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
             ActorRef actorRef, RaftRPC rpc) throws Exception {