X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeaderTest.java;h=8529e1926b31ebb88548653594ec867af202b6ea;hb=224aa4f574c63576961dc9dc37e075e2e5096a5a;hp=c39c62c727b601b0dd20c9e59098345604490798;hpb=db8fed63e18fccd2721fa7e189b2278a4f240f2c;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index c39c62c727..8529e1926b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -37,6 +37,7 @@ import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort; import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.RaftVersions; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; import org.opendaylight.controller.cluster.raft.SerializationUtils; @@ -145,9 +146,13 @@ public class LeaderTest extends AbstractLeaderTest { private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){ + return sendReplicate(actorContext, 1, index); + } + + private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index){ MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo"); MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry( - 1, index, payload); + term, index, payload); actorContext.getReplicatedLog().append(newEntry); return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry)); } @@ -189,6 +194,58 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex()); } + @Test + public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception { + logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + + // The raft context is initialized with a couple log entries. However the commitIndex + // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't + // committed and applied. Now it regains leadership with a higher term (2). + long prevTerm = actorContext.getTermInformation().getCurrentTerm(); + long newTerm = prevTerm + 1; + actorContext.getTermInformation().update(newTerm, ""); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // The follower replies with the leader's current last index and term, simulating that it is + // up to date with the leader. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0)); + + // The commit index should not get updated even though consensus was reached. This is b/c the + // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries + // from previous terms by counting replicas". + assertEquals("Commit Index", -1, actorContext.getCommitIndex()); + + followerActor.underlyingActor().clear(); + + // Now replicate a new entry with the new term 2. + long newIndex = lastIndex + 1; + sendReplicate(actorContext, newTerm, newIndex); + + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex()); + assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm()); + assertEquals("Entries size", 1, appendEntries.getEntries().size()); + assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex()); + assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm()); + assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString()); + + // The follower replies with success. The leader should now update the commit index to the new index + // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all + // prior entries are committed indirectly". + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0)); + + assertEquals("Commit Index", newIndex, actorContext.getCommitIndex()); + } + @Test public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception { logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus"); @@ -1602,7 +1659,10 @@ public class LeaderTest extends AbstractLeaderTest { leader = new Leader(leaderActorContext); + FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); + assertEquals(payloadVersion, leader.getLeaderPayloadVersion()); + assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion()); short payloadVersion = 5; AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion); @@ -1629,8 +1689,10 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals(2, applyState.getReplicatedLogEntry().getIndex()); - FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); + assertEquals(2, followerInfo.getMatchIndex()); + assertEquals(3, followerInfo.getNextIndex()); assertEquals(payloadVersion, followerInfo.getPayloadVersion()); + assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion()); } @Test @@ -1943,7 +2005,7 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500); // Send reply from the voting follower and verify consensus. - leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0)); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0)); MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class); } @@ -2094,20 +2156,6 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100); } - @Test - public void testTransferLeadershipWithNoFollowers() { - logStart("testTransferLeadershipWithNoFollowers"); - - MockRaftActorContext leaderActorContext = createActorContext(); - - leader = new Leader(leaderActorContext); - - RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); - leader.transferLeadership(mockTransferCohort); - - verify(mockTransferCohort).transferComplete(); - } - @Override protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext, ActorRef actorRef, RaftRPC rpc) throws Exception {