X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeaderTest.java;h=c39c62c727b601b0dd20c9e59098345604490798;hp=89b83b3b369e8e5ed70349e2a1c8fcd223d7c9b4;hb=db8fed63e18fccd2721fa7e189b2278a4f240f2c;hpb=ed7166c5ba3a378f46640bb479464580c2b167ba diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 89b83b3b36..c39c62c727 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -1,8 +1,19 @@ +/* + * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + package org.opendaylight.controller.cluster.raft.behaviors; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Props; @@ -24,14 +35,17 @@ import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; +import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; import org.opendaylight.controller.cluster.raft.SerializationUtils; import org.opendaylight.controller.cluster.raft.Snapshot; +import org.opendaylight.controller.cluster.raft.VotingState; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; @@ -43,6 +57,8 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy; +import org.opendaylight.controller.cluster.raft.policy.RaftPolicy; import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import scala.concurrent.duration.FiniteDuration; @@ -170,6 +186,45 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex()); assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm()); assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString()); + assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex()); + } + + @Test + public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception { + logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setRaftPolicy(createRaftPolicy(true, true)); + + long term = 1; + actorContext.getTermInformation().update(term, ""); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // The follower would normally reply - simulate that explicitly here. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex, term, (short) 0)); + assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); + + followerActor.underlyingActor().clear(); + + RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1); + + // State should not change + assertTrue(raftBehavior instanceof Leader); + + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex()); + assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm()); + assertEquals("Entries size", 1, appendEntries.getEntries().size()); + assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex()); + assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm()); + assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString()); + assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex()); } @Test @@ -552,11 +607,6 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext actorContext = createActorContextWithFollower(); - Map leadersSnapshot = new HashMap<>(); - leadersSnapshot.put("1", "A"); - leadersSnapshot.put("2", "B"); - leadersSnapshot.put("3", "C"); - //clears leaders log actorContext.getReplicatedLog().removeFrom(0); @@ -607,6 +657,69 @@ public class LeaderTest extends AbstractLeaderTest { Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); } + @Test + public void testInitiateForceInstallSnapshot() throws Exception { + logStart("testInitiateForceInstallSnapshot"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + + final int followersLastIndex = 2; + final int snapshotIndex = -1; + final int newEntryIndex = 4; + final int snapshotTerm = -1; + final int currentTerm = 2; + + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + actorContext.setLastApplied(3); + actorContext.setCommitIndex(followersLastIndex); + + actorContext.getReplicatedLog().removeFrom(0); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // set the snapshot as absent and check if capture-snapshot is invoked. + leader.setSnapshot(null); + + for(int i=0;i<4;i++) { + actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1, + new MockRaftActorContext.MockPayload("X" + i))); + } + + // new entry + ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm, + new MockRaftActorContext.MockPayload("D")); + + actorContext.getReplicatedLog().append(entry); + + //update follower timestamp + leader.markFollowerActive(FOLLOWER_ID); + + // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets + // installed with a SendInstallSnapshot + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true)); + + assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing()); + + CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot(); + + assertTrue(cs.isInstallSnapshotInitiated()); + assertEquals(3, cs.getLastAppliedIndex()); + assertEquals(1, cs.getLastAppliedTerm()); + assertEquals(4, cs.getLastIndex()); + assertEquals(2, cs.getLastTerm()); + + // if an initiate is started again when first is in progress, it shouldnt initiate Capture + leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry)); + + Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); + } + + @Test public void testInstallSnapshot() throws Exception { logStart("testInstallSnapshot"); @@ -660,6 +773,56 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals(currentTerm, installSnapshot.getTerm()); } + @Test + public void testForceInstallSnapshot() throws Exception { + logStart("testForceInstallSnapshot"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); + + final int lastAppliedIndex = 3; + final int snapshotIndex = -1; + final int snapshotTerm = -1; + final int currentTerm = 2; + + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); + actorContext.setCommitIndex(lastAppliedIndex); + actorContext.setLastApplied(lastAppliedIndex); + + leader = new Leader(actorContext); + + // Initial heartbeat. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); + leader.getFollower(FOLLOWER_ID).setNextIndex(-1); + + Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(), + Collections.emptyList(), + lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm); + + RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot)); + + assertTrue(raftBehavior instanceof Leader); + + // check if installsnapshot gets called with the correct values. + + InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); + + assertNotNull(installSnapshot.getData()); + assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex()); + assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm()); + + assertEquals(currentTerm, installSnapshot.getTerm()); + } + @Test public void testHandleInstallSnapshotReplyLastChunk() throws Exception { logStart("testHandleInstallSnapshotReplyLastChunk"); @@ -1377,6 +1540,53 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm()); } + @Test + public void testHandleAppendEntriesReplyWithNewerTerm(){ + logStart("testHandleAppendEntriesReplyWithNewerTerm"); + + MockRaftActorContext leaderActorContext = createActorContext(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(10000, TimeUnit.SECONDS)); + + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build()); + + leader = new Leader(leaderActorContext); + leaderActor.underlyingActor().setBehavior(leader); + leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender()); + + AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + + assertEquals(false, appendEntriesReply.isSuccess()); + assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state()); + + MessageCollectorActor.clearMessages(leaderActor); + } + + @Test + public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){ + logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled"); + + MockRaftActorContext leaderActorContext = createActorContext(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(10000, TimeUnit.SECONDS)); + + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build()); + leaderActorContext.setRaftPolicy(createRaftPolicy(false, false)); + + leader = new Leader(leaderActorContext); + leaderActor.underlyingActor().setBehavior(leader); + leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender()); + + AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + + assertEquals(false, appendEntriesReply.isSuccess()); + assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state()); + + MessageCollectorActor.clearMessages(leaderActor); + } + @Test public void testHandleAppendEntriesReplySuccess() throws Exception { logStart("testHandleAppendEntriesReplySuccess"); @@ -1445,7 +1655,7 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext leaderActorContext = createActorContextWithFollower(); ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( new FiniteDuration(1000, TimeUnit.SECONDS)); - ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnaphotChunkSize(2); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2); leaderActorContext.setReplicatedLog( new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build()); @@ -1550,56 +1760,69 @@ public class LeaderTest extends AbstractLeaderTest { Assert.assertTrue(behavior instanceof Leader); } - @Test - public void testIsolatedLeaderCheckTwoFollowers() throws Exception { - logStart("testIsolatedLeaderCheckTwoFollowers"); + private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){ + ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1"); + ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2"); - new JavaTestKit(getSystem()) {{ - - ActorRef followerActor1 = getTestActor(); - ActorRef followerActor2 = getTestActor(); - - MockRaftActorContext leaderActorContext = createActorContext(); + MockRaftActorContext leaderActorContext = createActorContext(); - Map peerAddresses = new HashMap<>(); - peerAddresses.put("follower-1", followerActor1.path().toString()); - peerAddresses.put("follower-2", followerActor2.path().toString()); + Map peerAddresses = new HashMap<>(); + peerAddresses.put("follower-1", followerActor1.path().toString()); + peerAddresses.put("follower-2", followerActor2.path().toString()); - leaderActorContext.setPeerAddresses(peerAddresses); + leaderActorContext.setPeerAddresses(peerAddresses); + leaderActorContext.setRaftPolicy(raftPolicy); - leader = new Leader(leaderActorContext); + leader = new Leader(leaderActorContext); - leader.markFollowerActive("follower-1"); - leader.markFollowerActive("follower-2"); - RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); - Assert.assertTrue("Behavior not instance of Leader when all followers are active", + leader.markFollowerActive("follower-1"); + leader.markFollowerActive("follower-2"); + RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); + Assert.assertTrue("Behavior not instance of Leader when all followers are active", behavior instanceof Leader); - // kill 1 follower and verify if that got killed - final JavaTestKit probe = new JavaTestKit(getSystem()); - probe.watch(followerActor1); - followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender()); - final Terminated termMsg1 = probe.expectMsgClass(Terminated.class); - assertEquals(termMsg1.getActor(), followerActor1); - - leader.markFollowerInActive("follower-1"); - leader.markFollowerActive("follower-2"); - behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); - Assert.assertTrue("Behavior not instance of Leader when majority of followers are active", + // kill 1 follower and verify if that got killed + final JavaTestKit probe = new JavaTestKit(getSystem()); + probe.watch(followerActor1); + followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender()); + final Terminated termMsg1 = probe.expectMsgClass(Terminated.class); + assertEquals(termMsg1.getActor(), followerActor1); + + leader.markFollowerInActive("follower-1"); + leader.markFollowerActive("follower-2"); + behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); + Assert.assertTrue("Behavior not instance of Leader when majority of followers are active", behavior instanceof Leader); - // kill 2nd follower and leader should change to Isolated leader - followerActor2.tell(PoisonPill.getInstance(), null); - probe.watch(followerActor2); - followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender()); - final Terminated termMsg2 = probe.expectMsgClass(Terminated.class); - assertEquals(termMsg2.getActor(), followerActor2); - - leader.markFollowerInActive("follower-2"); - behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); - Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive", - behavior instanceof IsolatedLeader); - }}; + // kill 2nd follower and leader should change to Isolated leader + followerActor2.tell(PoisonPill.getInstance(), null); + probe.watch(followerActor2); + followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender()); + final Terminated termMsg2 = probe.expectMsgClass(Terminated.class); + assertEquals(termMsg2.getActor(), followerActor2); + + leader.markFollowerInActive("follower-2"); + return leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); + } + + @Test + public void testIsolatedLeaderCheckTwoFollowers() throws Exception { + logStart("testIsolatedLeaderCheckTwoFollowers"); + + RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE); + + Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive", + behavior instanceof IsolatedLeader); + } + + @Test + public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception { + logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled"); + + RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true)); + + Assert.assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled", + behavior instanceof Leader); } @Test @@ -1662,6 +1885,229 @@ public class LeaderTest extends AbstractLeaderTest { }}; } + @Test + public void testReplicationConsensusWithNonVotingFollower() { + logStart("testReplicationConsensusWithNonVotingFollower"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(1000, TimeUnit.SECONDS)); + + leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + + String nonVotingFollowerId = "nonvoting-follower"; + TestActorRef nonVotingFollowerActor = actorFactory.createTestActor( + Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId)); + + leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING); + + leader = new Leader(leaderActorContext); + + // Ignore initial heartbeats + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class); + + MessageCollectorActor.clearMessages(followerActor); + MessageCollectorActor.clearMessages(nonVotingFollowerActor); + MessageCollectorActor.clearMessages(leaderActor); + + // Send a Replicate message and wait for AppendEntries. + sendReplicate(leaderActorContext, 0); + + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class); + + // Send reply only from the voting follower and verify consensus via ApplyState. + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0)); + + MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class); + + leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0)); + + MessageCollectorActor.clearMessages(followerActor); + MessageCollectorActor.clearMessages(nonVotingFollowerActor); + MessageCollectorActor.clearMessages(leaderActor); + + // Send another Replicate message + sendReplicate(leaderActorContext, 1); + + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, + AppendEntries.class); + assertEquals("Log entries size", 1, appendEntries.getEntries().size()); + assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex()); + + // Send reply only from the non-voting follower and verify no consensus via no ApplyState. + leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0)); + + MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500); + + // Send reply from the voting follower and verify consensus. + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0)); + + MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class); + } + + @Test + public void testTransferLeadershipWithFollowerInSync() { + logStart("testTransferLeadershipWithFollowerInSync"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(1000, TimeUnit.SECONDS)); + leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + + leader = new Leader(leaderActorContext); + + // Initial heartbeat + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0)); + MessageCollectorActor.clearMessages(followerActor); + + sendReplicate(leaderActorContext, 0); + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0)); + MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class); + MessageCollectorActor.clearMessages(followerActor); + + RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); + leader.transferLeadership(mockTransferCohort); + + verify(mockTransferCohort, never()).transferComplete(); + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0)); + + // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date + MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2); + + // Leader should force an election timeout + MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class); + + verify(mockTransferCohort).transferComplete(); + } + + @Test + public void testTransferLeadershipWithEmptyLog() { + logStart("testTransferLeadershipWithEmptyLog"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(1000, TimeUnit.SECONDS)); + leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + + leader = new Leader(leaderActorContext); + + // Initial heartbeat + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0)); + MessageCollectorActor.clearMessages(followerActor); + + RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); + leader.transferLeadership(mockTransferCohort); + + verify(mockTransferCohort, never()).transferComplete(); + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0)); + + // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // Leader should force an election timeout + MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class); + + verify(mockTransferCohort).transferComplete(); + } + + @Test + public void testTransferLeadershipWithFollowerInitiallyOutOfSync() { + logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(200, TimeUnit.MILLISECONDS)); + + leader = new Leader(leaderActorContext); + + // Initial heartbeat + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + MessageCollectorActor.clearMessages(followerActor); + + RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); + leader.transferLeadership(mockTransferCohort); + + verify(mockTransferCohort, never()).transferComplete(); + + // Sync up the follower. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0)); + MessageCollectorActor.clearMessages(followerActor); + + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams(). + getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS); + leader.handleMessage(leaderActor, new SendHeartBeat()); + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0)); + + // Leader should force an election timeout + MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class); + + verify(mockTransferCohort).transferComplete(); + } + + @Test + public void testTransferLeadershipWithFollowerSyncTimeout() { + logStart("testTransferLeadershipWithFollowerSyncTimeout"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(200, TimeUnit.MILLISECONDS)); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2); + leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + + leader = new Leader(leaderActorContext); + + // Initial heartbeat + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0)); + MessageCollectorActor.clearMessages(followerActor); + + sendReplicate(leaderActorContext, 0); + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + MessageCollectorActor.clearMessages(followerActor); + + RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); + leader.transferLeadership(mockTransferCohort); + + verify(mockTransferCohort, never()).transferComplete(); + + // Send heartbeats to time out the transfer. + for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) { + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams(). + getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS); + leader.handleMessage(leaderActor, new SendHeartBeat()); + } + + verify(mockTransferCohort).abortTransfer(); + verify(mockTransferCohort, never()).transferComplete(); + MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100); + } + + @Test + public void testTransferLeadershipWithNoFollowers() { + logStart("testTransferLeadershipWithNoFollowers"); + + MockRaftActorContext leaderActorContext = createActorContext(); + + leader = new Leader(leaderActorContext); + + RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); + leader.transferLeadership(mockTransferCohort); + + verify(mockTransferCohort).transferComplete(); + } + @Override protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext, ActorRef actorRef, RaftRPC rpc) throws Exception {