X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeaderTest.java;h=479ba8fbc5858c938eef4cd9075840835b16a039;hb=af406fdd1a948e05014b95e2d23d59cda1ebd837;hp=27854f40e957a7a395fcd3e8c0cd5df871b3e00e;hpb=a564647b197ef00124b9ae6aa00578dd73e27aa1;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 27854f40e9..479ba8fbc5 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -1,18 +1,30 @@ +/* + * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + package org.opendaylight.controller.cluster.raft.behaviors; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Terminated; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; -import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -24,13 +36,18 @@ import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; +import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort; import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.RaftVersions; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; import org.opendaylight.controller.cluster.raft.SerializationUtils; +import org.opendaylight.controller.cluster.raft.Snapshot; +import org.opendaylight.controller.cluster.raft.VotingState; import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; @@ -42,6 +59,8 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy; +import org.opendaylight.controller.cluster.raft.policy.RaftPolicy; import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import scala.concurrent.duration.FiniteDuration; @@ -58,6 +77,7 @@ public class LeaderTest extends AbstractLeaderTest { Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower")); private Leader leader; + private final short payloadVersion = 5; @Override @After @@ -127,9 +147,13 @@ public class LeaderTest extends AbstractLeaderTest { private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){ + return sendReplicate(actorContext, 1, index); + } + + private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index){ MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo"); MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry( - 1, index, payload); + term, index, payload); actorContext.getReplicatedLog().append(newEntry); return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry)); } @@ -156,11 +180,97 @@ public class LeaderTest extends AbstractLeaderTest { followerActor.underlyingActor().clear(); - MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo"); - MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry( - 1, lastIndex + 1, payload); - actorContext.getReplicatedLog().append(newEntry); - RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex+1); + RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1); + + // State should not change + assertTrue(raftBehavior instanceof Leader); + + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex()); + assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm()); + assertEquals("Entries size", 1, appendEntries.getEntries().size()); + assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex()); + assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm()); + assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString()); + assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex()); + } + + @Test + public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception { + logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + + // The raft context is initialized with a couple log entries. However the commitIndex + // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't + // committed and applied. Now it regains leadership with a higher term (2). + long prevTerm = actorContext.getTermInformation().getCurrentTerm(); + long newTerm = prevTerm + 1; + actorContext.getTermInformation().update(newTerm, ""); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // The follower replies with the leader's current last index and term, simulating that it is + // up to date with the leader. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0)); + + // The commit index should not get updated even though consensus was reached. This is b/c the + // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries + // from previous terms by counting replicas". + assertEquals("Commit Index", -1, actorContext.getCommitIndex()); + + followerActor.underlyingActor().clear(); + + // Now replicate a new entry with the new term 2. + long newIndex = lastIndex + 1; + sendReplicate(actorContext, newTerm, newIndex); + + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex()); + assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm()); + assertEquals("Entries size", 1, appendEntries.getEntries().size()); + assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex()); + assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm()); + assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString()); + + // The follower replies with success. The leader should now update the commit index to the new index + // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all + // prior entries are committed indirectly". + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0)); + + assertEquals("Commit Index", newIndex, actorContext.getCommitIndex()); + } + + @Test + public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception { + logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setRaftPolicy(createRaftPolicy(true, true)); + + long term = 1; + actorContext.getTermInformation().update(term, ""); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // The follower would normally reply - simulate that explicitly here. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex, term, (short) 0)); + assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); + + followerActor.underlyingActor().clear(); + + RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1); // State should not change assertTrue(raftBehavior instanceof Leader); @@ -171,7 +281,8 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("Entries size", 1, appendEntries.getEntries().size()); assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex()); assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm()); - assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData()); + assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString()); + assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex()); } @Test @@ -442,8 +553,8 @@ public class LeaderTest extends AbstractLeaderTest { //clears leaders log actorContext.getReplicatedLog().removeFrom(0); - final int followersLastIndex = 2; - final int snapshotIndex = 3; + final int commitIndex = 3; + final int snapshotIndex = 2; final int newEntryIndex = 4; final int snapshotTerm = 1; final int currentTerm = 2; @@ -451,12 +562,15 @@ public class LeaderTest extends AbstractLeaderTest { // set the snapshot variables in replicatedlog actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); - actorContext.setCommitIndex(followersLastIndex); + actorContext.setCommitIndex(commitIndex); //set follower timeout to 2 mins, helps during debugging actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10)); leader = new Leader(actorContext); + leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); + leader.getFollower(FOLLOWER_ID).setNextIndex(0); + // new entry ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm, @@ -466,7 +580,8 @@ public class LeaderTest extends AbstractLeaderTest { leader.markFollowerActive(FOLLOWER_ID); ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(Optional.of(bs)); + leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.emptyList(), + commitIndex, snapshotTerm, commitIndex, snapshotTerm)); FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs); leader.setFollowerSnapshot(FOLLOWER_ID, fts); @@ -492,7 +607,7 @@ public class LeaderTest extends AbstractLeaderTest { InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); - assertEquals(snapshotIndex, is.getLastIncludedIndex()); + assertEquals(commitIndex, is.getLastIncludedIndex()); } @Test @@ -550,11 +665,6 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext actorContext = createActorContextWithFollower(); - Map leadersSnapshot = new HashMap<>(); - leadersSnapshot.put("1", "A"); - leadersSnapshot.put("2", "B"); - leadersSnapshot.put("3", "C"); - //clears leaders log actorContext.getReplicatedLog().removeFrom(0); @@ -576,7 +686,7 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); // set the snapshot as absent and check if capture-snapshot is invoked. - leader.setSnapshot(Optional.absent()); + leader.setSnapshot(null); // new entry ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm, @@ -605,6 +715,69 @@ public class LeaderTest extends AbstractLeaderTest { Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); } + @Test + public void testInitiateForceInstallSnapshot() throws Exception { + logStart("testInitiateForceInstallSnapshot"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + + final int followersLastIndex = 2; + final int snapshotIndex = -1; + final int newEntryIndex = 4; + final int snapshotTerm = -1; + final int currentTerm = 2; + + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + actorContext.setLastApplied(3); + actorContext.setCommitIndex(followersLastIndex); + + actorContext.getReplicatedLog().removeFrom(0); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // set the snapshot as absent and check if capture-snapshot is invoked. + leader.setSnapshot(null); + + for(int i=0;i<4;i++) { + actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1, + new MockRaftActorContext.MockPayload("X" + i))); + } + + // new entry + ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm, + new MockRaftActorContext.MockPayload("D")); + + actorContext.getReplicatedLog().append(entry); + + //update follower timestamp + leader.markFollowerActive(FOLLOWER_ID); + + // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets + // installed with a SendInstallSnapshot + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true)); + + assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing()); + + CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot(); + + assertTrue(cs.isInstallSnapshotInitiated()); + assertEquals(3, cs.getLastAppliedIndex()); + assertEquals(1, cs.getLastAppliedTerm()); + assertEquals(4, cs.getLastIndex()); + assertEquals(2, cs.getLastTerm()); + + // if an initiate is started again when first is in progress, it shouldnt initiate Capture + leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry)); + + Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); + } + + @Test public void testInstallSnapshot() throws Exception { logStart("testInstallSnapshot"); @@ -619,8 +792,8 @@ public class LeaderTest extends AbstractLeaderTest { //clears leaders log actorContext.getReplicatedLog().removeFrom(0); - final int followersLastIndex = 2; - final int snapshotIndex = 3; + final int lastAppliedIndex = 3; + final int snapshotIndex = 2; final int snapshotTerm = 1; final int currentTerm = 2; @@ -628,15 +801,72 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); - actorContext.setCommitIndex(followersLastIndex); + actorContext.setCommitIndex(lastAppliedIndex); + actorContext.setLastApplied(lastAppliedIndex); leader = new Leader(actorContext); - // Ignore initial heartbeat. + // Initial heartbeat. MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, - new SendInstallSnapshot(toByteString(leadersSnapshot))); + leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); + leader.getFollower(FOLLOWER_ID).setNextIndex(0); + + Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(), + Collections.emptyList(), + lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm); + + RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot)); + + assertTrue(raftBehavior instanceof Leader); + + // check if installsnapshot gets called with the correct values. + + InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); + + assertNotNull(installSnapshot.getData()); + assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex()); + assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm()); + + assertEquals(currentTerm, installSnapshot.getTerm()); + } + + @Test + public void testForceInstallSnapshot() throws Exception { + logStart("testForceInstallSnapshot"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); + + final int lastAppliedIndex = 3; + final int snapshotIndex = -1; + final int snapshotTerm = -1; + final int currentTerm = 2; + + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); + actorContext.setCommitIndex(lastAppliedIndex); + actorContext.setLastApplied(lastAppliedIndex); + + leader = new Leader(actorContext); + + // Initial heartbeat. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); + leader.getFollower(FOLLOWER_ID).setNextIndex(-1); + + Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(), + Collections.emptyList(), + lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm); + + RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot)); assertTrue(raftBehavior instanceof Leader); @@ -645,7 +875,7 @@ public class LeaderTest extends AbstractLeaderTest { InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); assertNotNull(installSnapshot.getData()); - assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex()); + assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex()); assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm()); assertEquals(currentTerm, installSnapshot.getTerm()); @@ -657,15 +887,18 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext actorContext = createActorContextWithFollower(); - final int followersLastIndex = 2; - final int snapshotIndex = 3; + final int commitIndex = 3; + final int snapshotIndex = 2; final int snapshotTerm = 1; final int currentTerm = 2; - actorContext.setCommitIndex(followersLastIndex); + actorContext.setCommitIndex(commitIndex); leader = new Leader(actorContext); + leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); + leader.getFollower(FOLLOWER_ID).setNextIndex(0); + // Ignore initial heartbeat. MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -681,7 +914,8 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(Optional.of(bs)); + leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.emptyList(), + commitIndex, snapshotTerm, commitIndex, snapshotTerm)); FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs); leader.setFollowerSnapshot(FOLLOWER_ID, fts); while(!fts.isLastChunk(fts.getChunkIndex())) { @@ -701,9 +935,8 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals(1, leader.followerLogSize()); FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID); assertNotNull(fli); - assertEquals(snapshotIndex, fli.getMatchIndex()); - assertEquals(snapshotIndex, fli.getMatchIndex()); - assertEquals(snapshotIndex + 1, fli.getNextIndex()); + assertEquals(commitIndex, fli.getMatchIndex()); + assertEquals(commitIndex + 1, fli.getNextIndex()); } @Test @@ -712,8 +945,8 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext actorContext = createActorContextWithFollower(); - final int followersLastIndex = 2; - final int snapshotIndex = 3; + final int commitIndex = 3; + final int snapshotIndex = 2; final int snapshotTerm = 1; final int currentTerm = 2; @@ -727,10 +960,13 @@ public class LeaderTest extends AbstractLeaderTest { configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS)); actorContext.setConfigParams(configParams); - actorContext.setCommitIndex(followersLastIndex); + actorContext.setCommitIndex(commitIndex); leader = new Leader(actorContext); + leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); + leader.getFollower(FOLLOWER_ID).setNextIndex(0); + Map leadersSnapshot = new HashMap<>(); leadersSnapshot.put("1", "A"); leadersSnapshot.put("2", "B"); @@ -742,9 +978,11 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(Optional.of(bs)); + Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.emptyList(), + commitIndex, snapshotTerm, commitIndex, snapshotTerm); + leader.setSnapshot(snapshot); - leader.handleMessage(leaderActor, new SendInstallSnapshot(bs)); + leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot)); InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); @@ -783,8 +1021,8 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext actorContext = createActorContextWithFollower(); - final int followersLastIndex = 2; - final int snapshotIndex = 3; + final int commitIndex = 3; + final int snapshotIndex = 2; final int snapshotTerm = 1; final int currentTerm = 2; @@ -795,10 +1033,13 @@ public class LeaderTest extends AbstractLeaderTest { } }); - actorContext.setCommitIndex(followersLastIndex); + actorContext.setCommitIndex(commitIndex); leader = new Leader(actorContext); + leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); + leader.getFollower(FOLLOWER_ID).setNextIndex(0); + Map leadersSnapshot = new HashMap<>(); leadersSnapshot.put("1", "A"); leadersSnapshot.put("2", "B"); @@ -810,10 +1051,12 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(Optional.of(bs)); + Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.emptyList(), + commitIndex, snapshotTerm, commitIndex, snapshotTerm); + leader.setSnapshot(snapshot); Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); - leader.handleMessage(leaderActor, new SendInstallSnapshot(bs)); + leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot)); InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); @@ -842,8 +1085,8 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext actorContext = createActorContextWithFollower(); - final int followersLastIndex = 2; - final int snapshotIndex = 3; + final int commitIndex = 3; + final int snapshotIndex = 2; final int snapshotTerm = 1; final int currentTerm = 2; @@ -854,10 +1097,13 @@ public class LeaderTest extends AbstractLeaderTest { } }); - actorContext.setCommitIndex(followersLastIndex); + actorContext.setCommitIndex(commitIndex); leader = new Leader(actorContext); + leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); + leader.getFollower(FOLLOWER_ID).setNextIndex(0); + Map leadersSnapshot = new HashMap<>(); leadersSnapshot.put("1", "A"); leadersSnapshot.put("2", "B"); @@ -869,9 +1115,11 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(Optional.of(bs)); + Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.emptyList(), + commitIndex, snapshotTerm, commitIndex, snapshotTerm); + leader.setSnapshot(snapshot); - leader.handleMessage(leaderActor, new SendInstallSnapshot(bs)); + leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot)); InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); @@ -879,7 +1127,7 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals(3, installSnapshot.getTotalChunks()); assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue()); - int hashCode = installSnapshot.getData().hashCode(); + int hashCode = Arrays.hashCode(installSnapshot.getData()); followerActor.underlyingActor().clear(); @@ -930,8 +1178,8 @@ public class LeaderTest extends AbstractLeaderTest { j = barray.length; } - ByteString chunk = fts.getNextChunk(); - assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size()); + byte[] chunk = fts.getNextChunk(); + assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.length); assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex()); fts.markSendStatus(true); @@ -971,6 +1219,7 @@ public class LeaderTest extends AbstractLeaderTest { configParams.setElectionTimeoutFactor(100000); MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef); context.setConfigParams(configParams); + context.setPayloadVersion(payloadVersion); return context; } @@ -1124,12 +1373,15 @@ public class LeaderTest extends AbstractLeaderTest { leaderActorContext.setCommitIndex(leaderCommitIndex); leaderActorContext.setLastApplied(leaderCommitIndex); + ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1); + ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2); + MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader(); followerActorContext.setReplicatedLog( new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build()); - followerActorContext.setCommitIndex(1); - followerActorContext.setLastApplied(1); + followerActorContext.setCommitIndex(0); + followerActorContext.setLastApplied(0); Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); @@ -1151,26 +1403,37 @@ public class LeaderTest extends AbstractLeaderTest { leader.handleMessage(followerActor, appendEntriesReply); - MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2); - List appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2); + MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1); + appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - // Verify AppendEntries sent with the leader's second log entry. - appendEntries = appendEntriesList.get(0); assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); - assertEquals("Log entries size", 1, appendEntries.getEntries().size()); - assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex()); assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + assertEquals("Log entries size", 2, appendEntries.getEntries().size()); - // Verify AppendEntries sent with the leader's third log entry. - appendEntries = appendEntriesList.get(1); - assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); - assertEquals("Log entries size", 1, appendEntries.getEntries().size()); - assertEquals("Log entry index", 2, appendEntries.getEntries().get(0).getIndex()); - assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex()); + assertEquals("First entry data", leadersSecondLogEntry.getData(), + appendEntries.getEntries().get(0).getData()); + assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex()); + assertEquals("Second entry data", leadersThirdLogEntry.getData(), + appendEntries.getEntries().get(1).getData()); FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); assertEquals("getNextIndex", 3, followerInfo.getNextIndex()); + List applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2); + + ApplyState applyState = applyStateList.get(0); + assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex()); + assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm()); + assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(), + applyState.getReplicatedLogEntry().getData()); + + applyState = applyStateList.get(1); + assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex()); + assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm()); + assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(), + applyState.getReplicatedLogEntry().getData()); + assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex()); assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex()); } @@ -1189,6 +1452,9 @@ public class LeaderTest extends AbstractLeaderTest { leaderActorContext.setCommitIndex(leaderCommitIndex); leaderActorContext.setLastApplied(leaderCommitIndex); + ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0); + ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1); + MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader(); followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); @@ -1215,26 +1481,37 @@ public class LeaderTest extends AbstractLeaderTest { leader.handleMessage(followerActor, appendEntriesReply); - MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2); - List appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2); + MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1); + appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - // Verify AppendEntries sent with the leader's first log entry. - appendEntries = appendEntriesList.get(0); assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); - assertEquals("Log entries size", 1, appendEntries.getEntries().size()); - assertEquals("Log entry index", 0, appendEntries.getEntries().get(0).getIndex()); assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + assertEquals("Log entries size", 2, appendEntries.getEntries().size()); - // Verify AppendEntries sent with the leader's second log entry. - appendEntries = appendEntriesList.get(1); - assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); - assertEquals("Log entries size", 1, appendEntries.getEntries().size()); - assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex()); - assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex()); + assertEquals("First entry data", leadersFirstLogEntry.getData(), + appendEntries.getEntries().get(0).getData()); + assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex()); + assertEquals("Second entry data", leadersSecondLogEntry.getData(), + appendEntries.getEntries().get(1).getData()); FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); assertEquals("getNextIndex", 2, followerInfo.getNextIndex()); + List applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2); + + ApplyState applyState = applyStateList.get(0); + assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex()); + assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm()); + assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(), + applyState.getReplicatedLogEntry().getData()); + + applyState = applyStateList.get(1); + assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex()); + assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm()); + assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(), + applyState.getReplicatedLogEntry().getData()); + assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex()); assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex()); } @@ -1283,22 +1560,21 @@ public class LeaderTest extends AbstractLeaderTest { leader.handleMessage(followerActor, appendEntriesReply); - MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2); - List appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2); + MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1); + appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - // Verify AppendEntries sent with the leader's first log entry. - appendEntries = appendEntriesList.get(0); assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); - assertEquals("Log entries size", 1, appendEntries.getEntries().size()); - assertEquals("Log entry index", 0, appendEntries.getEntries().get(0).getIndex()); assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + assertEquals("Log entries size", 2, appendEntries.getEntries().size()); - // Verify AppendEntries sent with the leader's third log entry. - appendEntries = appendEntriesList.get(1); - assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); - assertEquals("Log entries size", 1, appendEntries.getEntries().size()); - assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex()); - assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex()); + assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm()); + assertEquals("First entry data", leadersFirstLogEntry.getData(), + appendEntries.getEntries().get(0).getData()); + assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex()); + assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm()); + assertEquals("Second entry data", leadersSecondLogEntry.getData(), + appendEntries.getEntries().get(1).getData()); FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); assertEquals("getNextIndex", 2, followerInfo.getNextIndex()); @@ -1323,26 +1599,78 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testHandleAppendEntriesReplySuccess() throws Exception { - logStart("testHandleAppendEntriesReplySuccess"); + public void testHandleAppendEntriesReplyWithNewerTerm(){ + logStart("testHandleAppendEntriesReplyWithNewerTerm"); - MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + MockRaftActorContext leaderActorContext = createActorContext(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(10000, TimeUnit.SECONDS)); leaderActorContext.setReplicatedLog( - new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); - - leaderActorContext.setCommitIndex(1); - leaderActorContext.setLastApplied(1); - leaderActorContext.getTermInformation().update(1, "leader"); + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build()); leader = new Leader(leaderActorContext); + leaderActor.underlyingActor().setBehavior(leader); + leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender()); - short payloadVersion = 5; - AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion); - - RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply); + AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); - assertEquals(RaftState.Leader, raftActorBehavior.state()); + assertEquals(false, appendEntriesReply.isSuccess()); + assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state()); + + MessageCollectorActor.clearMessages(leaderActor); + } + + @Test + public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){ + logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled"); + + MockRaftActorContext leaderActorContext = createActorContext(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(10000, TimeUnit.SECONDS)); + + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build()); + leaderActorContext.setRaftPolicy(createRaftPolicy(false, false)); + + leader = new Leader(leaderActorContext); + leaderActor.underlyingActor().setBehavior(leader); + leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender()); + + AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + + assertEquals(false, appendEntriesReply.isSuccess()); + assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state()); + + MessageCollectorActor.clearMessages(leaderActor); + } + + @Test + public void testHandleAppendEntriesReplySuccess() throws Exception { + logStart("testHandleAppendEntriesReplySuccess"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); + + leaderActorContext.setCommitIndex(1); + leaderActorContext.setLastApplied(1); + leaderActorContext.getTermInformation().update(1, "leader"); + + leader = new Leader(leaderActorContext); + + FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); + + assertEquals(payloadVersion, leader.getLeaderPayloadVersion()); + assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion()); + + short payloadVersion = 5; + AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion); + + RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply); + + assertEquals(RaftState.Leader, raftActorBehavior.state()); assertEquals(2, leaderActorContext.getCommitIndex()); @@ -1362,8 +1690,10 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals(2, applyState.getReplicatedLogEntry().getIndex()); - FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); + assertEquals(2, followerInfo.getMatchIndex()); + assertEquals(3, followerInfo.getNextIndex()); assertEquals(payloadVersion, followerInfo.getPayloadVersion()); + assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion()); } @Test @@ -1382,162 +1712,180 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testHandleRequestVoteReply(){ - logStart("testHandleRequestVoteReply"); + public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() { + logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded"); - MockRaftActorContext leaderActorContext = createActorContext(); + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(1000, TimeUnit.SECONDS)); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2); - leader = new Leader(leaderActorContext); + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build()); + long leaderCommitIndex = 3; + leaderActorContext.setCommitIndex(leaderCommitIndex); + leaderActorContext.setLastApplied(leaderCommitIndex); - // Should be a no-op. - RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor, - new RequestVoteReply(1, true)); + ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0); + ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1); + ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2); + ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3); - assertEquals(RaftState.Leader, raftActorBehavior.state()); + MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader(); - raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false)); + followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + followerActorContext.setCommitIndex(-1); + followerActorContext.setLastApplied(-1); - assertEquals(RaftState.Leader, raftActorBehavior.state()); - } + Follower follower = new Follower(followerActorContext); + followerActor.underlyingActor().setBehavior(follower); - @Test - public void testIsolatedLeaderCheckNoFollowers() { - logStart("testIsolatedLeaderCheckNoFollowers"); + leader = new Leader(leaderActorContext); - MockRaftActorContext leaderActorContext = createActorContext(); + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); - leader = new Leader(leaderActorContext); - RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); - Assert.assertTrue(behavior instanceof Leader); - } + MessageCollectorActor.clearMessages(followerActor); + MessageCollectorActor.clearMessages(leaderActor); - @Test - public void testIsolatedLeaderCheckTwoFollowers() throws Exception { - logStart("testIsolatedLeaderCheckTwoFollowers"); + // Verify initial AppendEntries sent with the leader's current commit index. + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("Log entries size", 0, appendEntries.getEntries().size()); + assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex()); - new JavaTestKit(getSystem()) {{ + leaderActor.underlyingActor().setBehavior(leader); - ActorRef followerActor1 = getTestActor(); - ActorRef followerActor2 = getTestActor(); + leader.handleMessage(followerActor, appendEntriesReply); - MockRaftActorContext leaderActorContext = createActorContext(); + List appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2); + MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2); - Map peerAddresses = new HashMap<>(); - peerAddresses.put("follower-1", followerActor1.path().toString()); - peerAddresses.put("follower-2", followerActor2.path().toString()); + appendEntries = appendEntriesList.get(0); + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + assertEquals("Log entries size", 2, appendEntries.getEntries().size()); - leaderActorContext.setPeerAddresses(peerAddresses); + assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex()); + assertEquals("First entry data", leadersFirstLogEntry.getData(), + appendEntries.getEntries().get(0).getData()); + assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex()); + assertEquals("Second entry data", leadersSecondLogEntry.getData(), + appendEntries.getEntries().get(1).getData()); + + appendEntries = appendEntriesList.get(1); + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + assertEquals("Log entries size", 2, appendEntries.getEntries().size()); - leader = new Leader(leaderActorContext); + assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex()); + assertEquals("First entry data", leadersThirdLogEntry.getData(), + appendEntries.getEntries().get(0).getData()); + assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex()); + assertEquals("Second entry data", leadersFourthLogEntry.getData(), + appendEntries.getEntries().get(1).getData()); - leader.markFollowerActive("follower-1"); - leader.markFollowerActive("follower-2"); - RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); - Assert.assertTrue("Behavior not instance of Leader when all followers are active", - behavior instanceof Leader); + FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); + assertEquals("getNextIndex", 4, followerInfo.getNextIndex()); - // kill 1 follower and verify if that got killed - final JavaTestKit probe = new JavaTestKit(getSystem()); - probe.watch(followerActor1); - followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender()); - final Terminated termMsg1 = probe.expectMsgClass(Terminated.class); - assertEquals(termMsg1.getActor(), followerActor1); - - leader.markFollowerInActive("follower-1"); - leader.markFollowerActive("follower-2"); - behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); - Assert.assertTrue("Behavior not instance of Leader when majority of followers are active", - behavior instanceof Leader); + MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4); - // kill 2nd follower and leader should change to Isolated leader - followerActor2.tell(PoisonPill.getInstance(), null); - probe.watch(followerActor2); - followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender()); - final Terminated termMsg2 = probe.expectMsgClass(Terminated.class); - assertEquals(termMsg2.getActor(), followerActor2); - - leader.markFollowerInActive("follower-2"); - behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); - Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive", - behavior instanceof IsolatedLeader); - }}; + assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex()); + assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex()); } - @Test - public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception { - logStart("testAppendEntryCallAtEndofAppendEntryReply"); + public void testHandleRequestVoteReply(){ + logStart("testHandleRequestVoteReply"); - MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + MockRaftActorContext leaderActorContext = createActorContext(); - DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); - //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS)); - configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS)); + leader = new Leader(leaderActorContext); - leaderActorContext.setConfigParams(configParams); + // Should be a no-op. + RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor, + new RequestVoteReply(1, true)); - MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor); + assertEquals(RaftState.Leader, raftActorBehavior.state()); - followerActorContext.setConfigParams(configParams); - followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString())); + raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false)); - Follower follower = new Follower(followerActorContext); - followerActor.underlyingActor().setBehavior(follower); + assertEquals(RaftState.Leader, raftActorBehavior.state()); + } - leaderActorContext.getReplicatedLog().removeFrom(0); - leaderActorContext.setCommitIndex(-1); - leaderActorContext.setLastApplied(-1); + @Test + public void testIsolatedLeaderCheckNoFollowers() { + logStart("testIsolatedLeaderCheckNoFollowers"); - followerActorContext.getReplicatedLog().removeFrom(0); - followerActorContext.setCommitIndex(-1); - followerActorContext.setLastApplied(-1); + MockRaftActorContext leaderActorContext = createActorContext(); leader = new Leader(leaderActorContext); + RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); + Assert.assertTrue(behavior instanceof Leader); + } - AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching( - leaderActor, AppendEntriesReply.class); - - leader.handleMessage(followerActor, appendEntriesReply); + private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){ + ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1"); + ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2"); - // Clear initial heartbeat messages + MockRaftActorContext leaderActorContext = createActorContext(); - leaderActor.underlyingActor().clear(); - followerActor.underlyingActor().clear(); + Map peerAddresses = new HashMap<>(); + peerAddresses.put("follower-1", followerActor1.path().toString()); + peerAddresses.put("follower-2", followerActor2.path().toString()); - // create 3 entries - leaderActorContext.setReplicatedLog( - new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); - leaderActorContext.setCommitIndex(1); - leaderActorContext.setLastApplied(1); + leaderActorContext.setPeerAddresses(peerAddresses); + leaderActorContext.setRaftPolicy(raftPolicy); - Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(), - TimeUnit.MILLISECONDS); + leader = new Leader(leaderActorContext); - leader.handleMessage(leaderActor, new SendHeartBeat()); + leader.markFollowerActive("follower-1"); + leader.markFollowerActive("follower-2"); + RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); + Assert.assertTrue("Behavior not instance of Leader when all followers are active", + behavior instanceof Leader); - AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + // kill 1 follower and verify if that got killed + final JavaTestKit probe = new JavaTestKit(getSystem()); + probe.watch(followerActor1); + followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender()); + final Terminated termMsg1 = probe.expectMsgClass(Terminated.class); + assertEquals(termMsg1.getActor(), followerActor1); + + leader.markFollowerInActive("follower-1"); + leader.markFollowerActive("follower-2"); + behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); + Assert.assertTrue("Behavior not instance of Leader when majority of followers are active", + behavior instanceof Leader); - // Should send first log entry - assertEquals(1, appendEntries.getLeaderCommit()); - assertEquals(0, appendEntries.getEntries().get(0).getIndex()); - assertEquals(-1, appendEntries.getPrevLogIndex()); + // kill 2nd follower and leader should change to Isolated leader + followerActor2.tell(PoisonPill.getInstance(), null); + probe.watch(followerActor2); + followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender()); + final Terminated termMsg2 = probe.expectMsgClass(Terminated.class); + assertEquals(termMsg2.getActor(), followerActor2); - appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + leader.markFollowerInActive("follower-2"); + return leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); + } - assertEquals(1, appendEntriesReply.getLogLastTerm()); - assertEquals(0, appendEntriesReply.getLogLastIndex()); + @Test + public void testIsolatedLeaderCheckTwoFollowers() throws Exception { + logStart("testIsolatedLeaderCheckTwoFollowers"); - followerActor.underlyingActor().clear(); + RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE); - leader.handleAppendEntriesReply(followerActor, appendEntriesReply); + Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive", + behavior instanceof IsolatedLeader); + } - appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + @Test + public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception { + logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled"); - // Should send second log entry - assertEquals(1, appendEntries.getLeaderCommit()); - assertEquals(1, appendEntries.getEntries().get(0).getIndex()); + RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true)); - follower.close(); + Assert.assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled", + behavior instanceof Leader); } @Test @@ -1600,6 +1948,215 @@ public class LeaderTest extends AbstractLeaderTest { }}; } + @Test + public void testReplicationConsensusWithNonVotingFollower() { + logStart("testReplicationConsensusWithNonVotingFollower"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(1000, TimeUnit.SECONDS)); + + leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + + String nonVotingFollowerId = "nonvoting-follower"; + TestActorRef nonVotingFollowerActor = actorFactory.createTestActor( + Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId)); + + leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING); + + leader = new Leader(leaderActorContext); + + // Ignore initial heartbeats + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class); + + MessageCollectorActor.clearMessages(followerActor); + MessageCollectorActor.clearMessages(nonVotingFollowerActor); + MessageCollectorActor.clearMessages(leaderActor); + + // Send a Replicate message and wait for AppendEntries. + sendReplicate(leaderActorContext, 0); + + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class); + + // Send reply only from the voting follower and verify consensus via ApplyState. + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0)); + + MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class); + + leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0)); + + MessageCollectorActor.clearMessages(followerActor); + MessageCollectorActor.clearMessages(nonVotingFollowerActor); + MessageCollectorActor.clearMessages(leaderActor); + + // Send another Replicate message + sendReplicate(leaderActorContext, 1); + + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, + AppendEntries.class); + assertEquals("Log entries size", 1, appendEntries.getEntries().size()); + assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex()); + + // Send reply only from the non-voting follower and verify no consensus via no ApplyState. + leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0)); + + MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500); + + // Send reply from the voting follower and verify consensus. + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0)); + + MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class); + } + + @Test + public void testTransferLeadershipWithFollowerInSync() { + logStart("testTransferLeadershipWithFollowerInSync"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(1000, TimeUnit.SECONDS)); + leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + + leader = new Leader(leaderActorContext); + + // Initial heartbeat + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0)); + MessageCollectorActor.clearMessages(followerActor); + + sendReplicate(leaderActorContext, 0); + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0)); + MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class); + MessageCollectorActor.clearMessages(followerActor); + + RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); + leader.transferLeadership(mockTransferCohort); + + verify(mockTransferCohort, never()).transferComplete(); + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0)); + + // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date + MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2); + + // Leader should force an election timeout + MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class); + + verify(mockTransferCohort).transferComplete(); + } + + @Test + public void testTransferLeadershipWithEmptyLog() { + logStart("testTransferLeadershipWithEmptyLog"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(1000, TimeUnit.SECONDS)); + leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + + leader = new Leader(leaderActorContext); + + // Initial heartbeat + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0)); + MessageCollectorActor.clearMessages(followerActor); + + RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); + leader.transferLeadership(mockTransferCohort); + + verify(mockTransferCohort, never()).transferComplete(); + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0)); + + // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // Leader should force an election timeout + MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class); + + verify(mockTransferCohort).transferComplete(); + } + + @Test + public void testTransferLeadershipWithFollowerInitiallyOutOfSync() { + logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(200, TimeUnit.MILLISECONDS)); + + leader = new Leader(leaderActorContext); + + // Initial heartbeat + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + MessageCollectorActor.clearMessages(followerActor); + + RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); + leader.transferLeadership(mockTransferCohort); + + verify(mockTransferCohort, never()).transferComplete(); + + // Sync up the follower. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0)); + MessageCollectorActor.clearMessages(followerActor); + + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams(). + getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS); + leader.handleMessage(leaderActor, new SendHeartBeat()); + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0)); + + // Leader should force an election timeout + MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class); + + verify(mockTransferCohort).transferComplete(); + } + + @Test + public void testTransferLeadershipWithFollowerSyncTimeout() { + logStart("testTransferLeadershipWithFollowerSyncTimeout"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(200, TimeUnit.MILLISECONDS)); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2); + leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + + leader = new Leader(leaderActorContext); + + // Initial heartbeat + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0)); + MessageCollectorActor.clearMessages(followerActor); + + sendReplicate(leaderActorContext, 0); + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + MessageCollectorActor.clearMessages(followerActor); + + RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); + leader.transferLeadership(mockTransferCohort); + + verify(mockTransferCohort, never()).transferComplete(); + + // Send heartbeats to time out the transfer. + for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) { + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams(). + getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS); + leader.handleMessage(leaderActor, new SendHeartBeat()); + } + + verify(mockTransferCohort).abortTransfer(); + verify(mockTransferCohort, never()).transferComplete(); + MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100); + } + @Override protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext, ActorRef actorRef, RaftRPC rpc) throws Exception {