X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeaderTest.java;h=1caf631cee883c8ed40d8bc15eca1392c69bac85;hp=6664600073f2c727936ece4da93f6cd5b76bb290;hb=93e6f3bfc003d4ce2d968761dff963615a0b799d;hpb=e46efc1aec2ec777a876d8fae0b7598e76291302;ds=sidebyside diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 6664600073..1caf631cee 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -10,7 +10,12 @@ package org.opendaylight.controller.cluster.raft.behaviors; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Props; @@ -20,19 +25,21 @@ import akka.testkit.TestActorRef; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import org.junit.After; -import org.junit.Assert; import org.junit.Test; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; +import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort; import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.RaftVersions; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; import org.opendaylight.controller.cluster.raft.SerializationUtils; @@ -41,7 +48,7 @@ import org.opendaylight.controller.cluster.raft.VotingState; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; -import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck; +import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; @@ -56,9 +63,10 @@ import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy; import org.opendaylight.controller.cluster.raft.policy.RaftPolicy; import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; +import org.opendaylight.yangtools.concepts.Identifier; import scala.concurrent.duration.FiniteDuration; -public class LeaderTest extends AbstractLeaderTest { +public class LeaderTest extends AbstractLeaderTest { static final String FOLLOWER_ID = "follower"; public static final String LEADER_ID = "leader"; @@ -88,10 +96,8 @@ public class LeaderTest extends AbstractLeaderTest { leader = new Leader(createActorContext()); - // handle message should return the Leader state when it receives an - // unknown message - RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo"); - Assert.assertTrue(behavior instanceof Leader); + // handle message should null when it receives an unknown message + assertNull(leader.handleMessage(followerActor, "foo")); } @Test @@ -106,6 +112,7 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(term, ""); leader = new Leader(actorContext); + actorContext.setCurrentBehavior(leader); // Leader should send an immediate heartbeat with no entries as follower is inactive. long lastIndex = actorContext.getReplicatedLog().lastIndex(); @@ -127,7 +134,7 @@ public class LeaderTest extends AbstractLeaderTest { Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams(). getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); - leader.handleMessage(leaderActor, new SendHeartBeat()); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex()); @@ -140,9 +147,13 @@ public class LeaderTest extends AbstractLeaderTest { private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){ + return sendReplicate(actorContext, 1, index); + } + + private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index){ MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo"); MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry( - 1, index, payload); + term, index, payload); actorContext.getReplicatedLog().append(newEntry); return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry)); } @@ -184,6 +195,59 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex()); } + @Test + public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception { + logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + + // The raft context is initialized with a couple log entries. However the commitIndex + // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't + // committed and applied. Now it regains leadership with a higher term (2). + long prevTerm = actorContext.getTermInformation().getCurrentTerm(); + long newTerm = prevTerm + 1; + actorContext.getTermInformation().update(newTerm, ""); + + leader = new Leader(actorContext); + actorContext.setCurrentBehavior(leader); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // The follower replies with the leader's current last index and term, simulating that it is + // up to date with the leader. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0)); + + // The commit index should not get updated even though consensus was reached. This is b/c the + // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries + // from previous terms by counting replicas". + assertEquals("Commit Index", -1, actorContext.getCommitIndex()); + + followerActor.underlyingActor().clear(); + + // Now replicate a new entry with the new term 2. + long newIndex = lastIndex + 1; + sendReplicate(actorContext, newTerm, newIndex); + + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex()); + assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm()); + assertEquals("Entries size", 1, appendEntries.getEntries().size()); + assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex()); + assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm()); + assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString()); + + // The follower replies with success. The leader should now update the commit index to the new index + // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all + // prior entries are committed indirectly". + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0)); + + assertEquals("Commit Index", newIndex, actorContext.getCommitIndex()); + } + @Test public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception { logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus"); @@ -345,7 +409,7 @@ public class LeaderTest extends AbstractLeaderTest { // Wait slightly longer than heartbeat duration Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS); - leader.handleMessage(leaderActor, new SendHeartBeat()); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); assertEquals("The number of append entries collected should be 2", 2, allMessages.size()); @@ -387,7 +451,7 @@ public class LeaderTest extends AbstractLeaderTest { for(int i=0;i<3;i++) { Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS); - leader.handleMessage(leaderActor, new SendHeartBeat()); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); } List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); @@ -423,7 +487,7 @@ public class LeaderTest extends AbstractLeaderTest { followerActor.underlyingActor().clear(); Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS); - leader.handleMessage(leaderActor, new SendHeartBeat()); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); sendReplicate(actorContext, lastIndex+1); List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); @@ -451,8 +515,8 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getReplicatedLog().append(newEntry); - RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, - new Replicate(leaderActor, "state-id", newEntry)); + final Identifier id = new MockIdentifier("state-id"); + RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new Replicate(leaderActor, id, newEntry)); // State should not change assertTrue(raftBehavior instanceof Leader); @@ -473,7 +537,7 @@ public class LeaderTest extends AbstractLeaderTest { ApplyState last = applyStateList.get((int) newLogIndex - 1); assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData()); - assertEquals("getIdentifier", "state-id", last.getIdentifier()); + assertEquals("getIdentifier", id, last.getIdentifier()); } @Test @@ -529,7 +593,7 @@ public class LeaderTest extends AbstractLeaderTest { Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); - leader.handleMessage(leaderActor, new SendHeartBeat()); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -540,7 +604,7 @@ public class LeaderTest extends AbstractLeaderTest { //InstallSnapshotReply received fts.markSendStatus(true); - leader.handleMessage(leaderActor, new SendHeartBeat()); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); @@ -589,7 +653,7 @@ public class LeaderTest extends AbstractLeaderTest { // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex RaftActorBehavior raftBehavior = leader.handleMessage( - leaderActor, new Replicate(null, "state-id", entry)); + leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry)); assertTrue(raftBehavior instanceof Leader); @@ -634,7 +698,7 @@ public class LeaderTest extends AbstractLeaderTest { //update follower timestamp leader.markFollowerActive(FOLLOWER_ID); - leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry)); + leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry)); assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing()); @@ -647,9 +711,9 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals(2, cs.getLastTerm()); // if an initiate is started again when first is in progress, it shouldnt initiate Capture - leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry)); + leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry)); - Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); + assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); } @Test @@ -708,10 +772,10 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals(4, cs.getLastIndex()); assertEquals(2, cs.getLastTerm()); - // if an initiate is started again when first is in progress, it shouldnt initiate Capture - leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry)); + // if an initiate is started again when first is in progress, it should not initiate Capture + leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry)); - Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); + assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); } @@ -832,6 +896,7 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.setCommitIndex(commitIndex); leader = new Leader(actorContext); + actorContext.setCurrentBehavior(leader); leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); leader.getFollower(FOLLOWER_ID).setNextIndex(0); @@ -900,6 +965,7 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.setCommitIndex(commitIndex); leader = new Leader(actorContext); + actorContext.setCurrentBehavior(leader); leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); leader.getFollower(FOLLOWER_ID).setNextIndex(0); @@ -948,7 +1014,7 @@ public class LeaderTest extends AbstractLeaderTest { installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class); - Assert.assertNull(installSnapshot); + assertNull(installSnapshot); } @@ -1008,7 +1074,7 @@ public class LeaderTest extends AbstractLeaderTest { Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); - leader.handleMessage(leaderActor, new SendHeartBeat()); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); @@ -1064,7 +1130,7 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals(3, installSnapshot.getTotalChunks()); assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue()); - int hashCode = installSnapshot.getData().hashCode(); + int hashCode = Arrays.hashCode(installSnapshot.getData()); followerActor.underlyingActor().clear(); @@ -1115,8 +1181,8 @@ public class LeaderTest extends AbstractLeaderTest { j = barray.length; } - ByteString chunk = fts.getNextChunk(); - assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size()); + byte[] chunk = fts.getNextChunk(); + assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.length); assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex()); fts.markSendStatus(true); @@ -1128,8 +1194,8 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks()); } - @Override protected RaftActorBehavior createBehavior( - RaftActorContext actorContext) { + @Override + protected Leader createBehavior(final RaftActorContext actorContext) { return new Leader(actorContext); } @@ -1179,6 +1245,7 @@ public class LeaderTest extends AbstractLeaderTest { Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); + followerActorContext.setCurrentBehavior(follower); Map peerAddresses = new HashMap<>(); peerAddresses.put(FOLLOWER_ID, followerActor.path().toString()); @@ -1202,6 +1269,7 @@ public class LeaderTest extends AbstractLeaderTest { followerActorContext.setCommitIndex(1); leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -1233,6 +1301,7 @@ public class LeaderTest extends AbstractLeaderTest { Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); + followerActorContext.setCurrentBehavior(follower); Map leaderPeerAddresses = new HashMap<>(); leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString()); @@ -1278,7 +1347,7 @@ public class LeaderTest extends AbstractLeaderTest { Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); - leader.handleMessage(leaderActor, new SendHeartBeat()); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -1400,6 +1469,7 @@ public class LeaderTest extends AbstractLeaderTest { Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); + followerActorContext.setCurrentBehavior(follower); leader = new Leader(leaderActorContext); @@ -1415,6 +1485,7 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex()); leaderActor.underlyingActor().setBehavior(leader); + leaderActorContext.setCurrentBehavior(leader); leader.handleMessage(followerActor, appendEntriesReply); @@ -1479,6 +1550,7 @@ public class LeaderTest extends AbstractLeaderTest { Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); + followerActorContext.setCurrentBehavior(follower); leader = new Leader(leaderActorContext); @@ -1494,6 +1566,7 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex()); leaderActor.underlyingActor().setBehavior(leader); + leaderActorContext.setCurrentBehavior(leader); leader.handleMessage(followerActor, appendEntriesReply); @@ -1597,7 +1670,10 @@ public class LeaderTest extends AbstractLeaderTest { leader = new Leader(leaderActorContext); + FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); + assertEquals(payloadVersion, leader.getLeaderPayloadVersion()); + assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion()); short payloadVersion = 5; AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion); @@ -1624,8 +1700,10 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals(2, applyState.getReplicatedLogEntry().getIndex()); - FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); + assertEquals(2, followerInfo.getMatchIndex()); + assertEquals(3, followerInfo.getNextIndex()); assertEquals(payloadVersion, followerInfo.getPayloadVersion()); + assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion()); } @Test @@ -1671,6 +1749,7 @@ public class LeaderTest extends AbstractLeaderTest { Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); + followerActorContext.setCurrentBehavior(follower); leader = new Leader(leaderActorContext); @@ -1686,6 +1765,7 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex()); leaderActor.underlyingActor().setBehavior(leader); + leaderActorContext.setCurrentBehavior(leader); leader.handleMessage(followerActor, appendEntriesReply); @@ -1751,8 +1831,27 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext leaderActorContext = createActorContext(); leader = new Leader(leaderActorContext); - RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); - Assert.assertTrue(behavior instanceof Leader); + RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK); + assertTrue(behavior instanceof Leader); + } + + @Test + public void testIsolatedLeaderCheckNoVotingFollowers() { + logStart("testIsolatedLeaderCheckNoVotingFollowers"); + + MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader(); + Follower follower = new Follower(followerActorContext); + followerActor.underlyingActor().setBehavior(follower); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(1000, TimeUnit.SECONDS)); + leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING); + + leader = new Leader(leaderActorContext); + leader.getFollower(FOLLOWER_ID).markFollowerActive(); + RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK); + assertTrue("Expected Leader", behavior instanceof Leader); } private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){ @@ -1772,9 +1871,8 @@ public class LeaderTest extends AbstractLeaderTest { leader.markFollowerActive("follower-1"); leader.markFollowerActive("follower-2"); - RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); - Assert.assertTrue("Behavior not instance of Leader when all followers are active", - behavior instanceof Leader); + RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK); + assertTrue("Behavior not instance of Leader when all followers are active", behavior instanceof Leader); // kill 1 follower and verify if that got killed final JavaTestKit probe = new JavaTestKit(getSystem()); @@ -1785,9 +1883,8 @@ public class LeaderTest extends AbstractLeaderTest { leader.markFollowerInActive("follower-1"); leader.markFollowerActive("follower-2"); - behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); - Assert.assertTrue("Behavior not instance of Leader when majority of followers are active", - behavior instanceof Leader); + behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK); + assertTrue("Behavior not instance of Leader when majority of followers are active", behavior instanceof Leader); // kill 2nd follower and leader should change to Isolated leader followerActor2.tell(PoisonPill.getInstance(), null); @@ -1797,7 +1894,7 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals(termMsg2.getActor(), followerActor2); leader.markFollowerInActive("follower-2"); - return leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); + return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK); } @Test @@ -1806,7 +1903,7 @@ public class LeaderTest extends AbstractLeaderTest { RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE); - Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive", + assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive", behavior instanceof IsolatedLeader); } @@ -1816,7 +1913,7 @@ public class LeaderTest extends AbstractLeaderTest { RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true)); - Assert.assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled", + assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled", behavior instanceof Leader); } @@ -1897,6 +1994,7 @@ public class LeaderTest extends AbstractLeaderTest { leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING); leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); // Ignore initial heartbeats MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -1938,13 +2036,163 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500); // Send reply from the voting follower and verify consensus. - leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0)); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0)); MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class); } + @Test + public void testTransferLeadershipWithFollowerInSync() { + logStart("testTransferLeadershipWithFollowerInSync"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(1000, TimeUnit.SECONDS)); + leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + + leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); + + // Initial heartbeat + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0)); + MessageCollectorActor.clearMessages(followerActor); + + sendReplicate(leaderActorContext, 0); + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0)); + MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class); + MessageCollectorActor.clearMessages(followerActor); + + RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); + leader.transferLeadership(mockTransferCohort); + + verify(mockTransferCohort, never()).transferComplete(); + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0)); + + // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date + MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2); + + // Leader should force an election timeout + MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class); + + verify(mockTransferCohort).transferComplete(); + } + + @Test + public void testTransferLeadershipWithEmptyLog() { + logStart("testTransferLeadershipWithEmptyLog"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(1000, TimeUnit.SECONDS)); + leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + + leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); + + // Initial heartbeat + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0)); + MessageCollectorActor.clearMessages(followerActor); + + RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); + leader.transferLeadership(mockTransferCohort); + + verify(mockTransferCohort, never()).transferComplete(); + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0)); + + // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // Leader should force an election timeout + MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class); + + verify(mockTransferCohort).transferComplete(); + } + + @Test + public void testTransferLeadershipWithFollowerInitiallyOutOfSync() { + logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(200, TimeUnit.MILLISECONDS)); + + leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); + + // Initial heartbeat + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + MessageCollectorActor.clearMessages(followerActor); + + RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); + leader.transferLeadership(mockTransferCohort); + + verify(mockTransferCohort, never()).transferComplete(); + + // Sync up the follower. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0)); + MessageCollectorActor.clearMessages(followerActor); + + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams(). + getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0)); + + // Leader should force an election timeout + MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class); + + verify(mockTransferCohort).transferComplete(); + } + + @Test + public void testTransferLeadershipWithFollowerSyncTimeout() { + logStart("testTransferLeadershipWithFollowerSyncTimeout"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(200, TimeUnit.MILLISECONDS)); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2); + leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + + leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); + + // Initial heartbeat + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0)); + MessageCollectorActor.clearMessages(followerActor); + + sendReplicate(leaderActorContext, 0); + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + MessageCollectorActor.clearMessages(followerActor); + + RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); + leader.transferLeadership(mockTransferCohort); + + verify(mockTransferCohort, never()).transferComplete(); + + // Send heartbeats to time out the transfer. + for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) { + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams(). + getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); + } + + verify(mockTransferCohort).abortTransfer(); + verify(mockTransferCohort, never()).transferComplete(); + MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100); + } + @Override - protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext, + protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext, ActorRef actorRef, RaftRPC rpc) throws Exception { super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc); assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());