+/*
+ * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
package org.opendaylight.controller.cluster.raft.behaviors;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.testkit.JavaTestKit;
-import com.google.common.base.Optional;
+import akka.testkit.TestActorRef;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort;
import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.RaftVersions;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.SerializationUtils;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.Snapshot;
+import org.opendaylight.controller.cluster.raft.VotingState;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
-import org.opendaylight.controller.cluster.raft.base.messages.InitiateInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
+import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
+import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
-import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
+import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy;
+import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
+import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-import org.opendaylight.controller.protobuff.messages.cluster.raft.InstallSnapshotMessages;
import scala.concurrent.duration.FiniteDuration;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-public class LeaderTest extends AbstractRaftActorBehaviorTest {
- private ActorRef leaderActor =
- getSystem().actorOf(Props.create(DoNothingActor.class));
- private ActorRef senderActor =
- getSystem().actorOf(Props.create(DoNothingActor.class));
+public class LeaderTest extends AbstractLeaderTest {
- @Test
- public void testHandleMessageForUnknownMessage() throws Exception {
- new JavaTestKit(getSystem()) {{
- Leader leader =
- new Leader(createActorContext());
-
- // handle message should return the Leader state when it receives an
- // unknown message
- RaftActorBehavior behavior = leader.handleMessage(senderActor, "foo");
- Assert.assertTrue(behavior instanceof Leader);
- }};
- }
+ static final String FOLLOWER_ID = "follower";
+ public static final String LEADER_ID = "leader";
+ private final TestActorRef<ForwardMessageToBehaviorActor> leaderActor = actorFactory.createTestActor(
+ Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader"));
- @Test
- public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
- new JavaTestKit(getSystem()) {{
-
- new Within(duration("1 seconds")) {
- protected void run() {
+ private final TestActorRef<ForwardMessageToBehaviorActor> followerActor = actorFactory.createTestActor(
+ Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower"));
- ActorRef followerActor = getTestActor();
+ private Leader leader;
+ private final short payloadVersion = 5;
- MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ if(leader != null) {
+ leader.close();
+ }
- Map<String, String> peerAddresses = new HashMap<>();
+ super.tearDown();
+ }
- peerAddresses.put(followerActor.path().toString(),
- followerActor.path().toString());
+ @Test
+ public void testHandleMessageForUnknownMessage() throws Exception {
+ logStart("testHandleMessageForUnknownMessage");
- actorContext.setPeerAddresses(peerAddresses);
+ leader = new Leader(createActorContext());
- Leader leader = new Leader(actorContext);
- leader.handleMessage(senderActor, new SendHeartBeat());
+ // handle message should return the Leader state when it receives an
+ // unknown message
+ RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo");
+ Assert.assertTrue(behavior instanceof Leader);
+ }
- final String out =
- new ExpectMsg<String>(duration("1 seconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- protected String match(Object in) {
- Object msg = fromSerializableMessage(in);
- if (msg instanceof AppendEntries) {
- if (((AppendEntries)msg).getTerm() == 0) {
- return "match";
- }
- return null;
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
+ @Test
+ public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception {
+ logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+ short payloadVersion = (short)5;
+ actorContext.setPayloadVersion(payloadVersion);
+
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
+
+ leader = new Leader(actorContext);
+
+ // Leader should send an immediate heartbeat with no entries as follower is inactive.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("getTerm", term, appendEntries.getTerm());
+ assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+ assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm());
+ assertEquals("Entries size", 0, appendEntries.getEntries().size());
+ assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
+
+ // The follower would normally reply - simulate that explicitly here.
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+ followerActor.underlyingActor().clear();
+
+ // Sleep for the heartbeat interval so AppendEntries is sent.
+ Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().
+ getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
+ assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+ assertEquals("Entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
+ assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
+ }
- assertEquals("match", out);
- }
- };
- }};
+ private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){
+ return sendReplicate(actorContext, 1, index);
}
- @Test
- public void testHandleReplicateMessageSendAppendEntriesToFollower() {
- new JavaTestKit(getSystem()) {{
-
- new Within(duration("1 seconds")) {
- protected void run() {
-
- ActorRef followerActor = getTestActor();
-
- MockRaftActorContext actorContext =
- (MockRaftActorContext) createActorContext();
-
- Map<String, String> peerAddresses = new HashMap<>();
-
- peerAddresses.put(followerActor.path().toString(),
- followerActor.path().toString());
-
- actorContext.setPeerAddresses(peerAddresses);
-
- Leader leader = new Leader(actorContext);
- RaftActorBehavior raftBehavior = leader
- .handleMessage(senderActor, new Replicate(null, null,
- new MockRaftActorContext.MockReplicatedLogEntry(1,
- 100,
- new MockRaftActorContext.MockPayload("foo"))
- ));
-
- // State should not change
- assertTrue(raftBehavior instanceof Leader);
-
- final String out =
- new ExpectMsg<String>(duration("1 seconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- protected String match(Object in) {
- Object msg = fromSerializableMessage(in);
- if (msg instanceof AppendEntries) {
- if (((AppendEntries)msg).getTerm() == 0) {
- return "match";
- }
- return null;
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- assertEquals("match", out);
- }
- };
- }};
+ private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index){
+ MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo");
+ MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
+ term, index, payload);
+ actorContext.getReplicatedLog().append(newEntry);
+ return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry));
}
@Test
- public void testHandleReplicateMessageWhenThereAreNoFollowers() {
- new JavaTestKit(getSystem()) {{
+ public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception {
+ logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
- new Within(duration("1 seconds")) {
- protected void run() {
+ MockRaftActorContext actorContext = createActorContextWithFollower();
- ActorRef raftActor = getTestActor();
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
- MockRaftActorContext actorContext =
- new MockRaftActorContext("test", getSystem(), raftActor);
+ leader = new Leader(actorContext);
- actorContext.getReplicatedLog().removeFrom(0);
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- actorContext.setReplicatedLog(
- new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1)
- .build());
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex, term, (short)0));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
- Leader leader = new Leader(actorContext);
- RaftActorBehavior raftBehavior = leader
- .handleMessage(senderActor, new Replicate(null, "state-id",actorContext.getReplicatedLog().get(1)));
+ followerActor.underlyingActor().clear();
- // State should not change
- assertTrue(raftBehavior instanceof Leader);
+ RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
- assertEquals(1, actorContext.getCommitIndex());
+ // State should not change
+ assertTrue(raftBehavior instanceof Leader);
- final String out =
- new ExpectMsg<String>(duration("1 seconds"),
- "match hint") {
- // do not put code outside this method, will run afterwards
- protected String match(Object in) {
- if (in instanceof ApplyState) {
- if (((ApplyState) in).getIdentifier().equals("state-id")) {
- return "match";
- }
- return null;
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- assertEquals("match", out);
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
+ assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+ assertEquals("Entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
+ assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
+ assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
+ }
- }
- };
- }};
+ @Test
+ public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception {
+ logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+
+ // The raft context is initialized with a couple log entries. However the commitIndex
+ // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
+ // committed and applied. Now it regains leadership with a higher term (2).
+ long prevTerm = actorContext.getTermInformation().getCurrentTerm();
+ long newTerm = prevTerm + 1;
+ actorContext.getTermInformation().update(newTerm, "");
+
+ leader = new Leader(actorContext);
+
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // The follower replies with the leader's current last index and term, simulating that it is
+ // up to date with the leader.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0));
+
+ // The commit index should not get updated even though consensus was reached. This is b/c the
+ // last entry's term does match the current term. As per ยง5.4.1, "Raft never commits log entries
+ // from previous terms by counting replicas".
+ assertEquals("Commit Index", -1, actorContext.getCommitIndex());
+
+ followerActor.underlyingActor().clear();
+
+ // Now replicate a new entry with the new term 2.
+ long newIndex = lastIndex + 1;
+ sendReplicate(actorContext, newTerm, newIndex);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
+ assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
+ assertEquals("Entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
+ assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
+
+ // The follower replies with success. The leader should now update the commit index to the new index
+ // as per ยง5.4.1 "once an entry from the current term is committed by counting replicas, then all
+ // prior entries are committed indirectly".
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0));
+
+ assertEquals("Commit Index", newIndex, actorContext.getCommitIndex());
}
@Test
- public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
- new JavaTestKit(getSystem()) {{
- ActorRef followerActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+ public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception {
+ logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus");
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(followerActor.path().toString(),
- followerActor.path().toString());
-
- MockRaftActorContext actorContext =
- (MockRaftActorContext) createActorContext(leaderActor);
- actorContext.setPeerAddresses(peerAddresses);
-
- Map<String, String> leadersSnapshot = new HashMap<>();
- leadersSnapshot.put("1", "A");
- leadersSnapshot.put("2", "B");
- leadersSnapshot.put("3", "C");
-
- //clears leaders log
- actorContext.getReplicatedLog().removeFrom(0);
-
- final int followersLastIndex = 2;
- final int snapshotIndex = 3;
- final int newEntryIndex = 4;
- final int snapshotTerm = 1;
- final int currentTerm = 2;
-
- // set the snapshot variables in replicatedlog
- actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
- actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
- actorContext.setCommitIndex(followersLastIndex);
- //set follower timeout to 2 mins, helps during debugging
- actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
-
- MockLeader leader = new MockLeader(actorContext);
-
- // new entry
- ReplicatedLogImplEntry entry =
- new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
- new MockRaftActorContext.MockPayload("D"));
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setRaftPolicy(createRaftPolicy(true, true));
- //update follower timestamp
- leader.markFollowerActive(followerActor.path().toString());
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
- ByteString bs = toByteString(leadersSnapshot);
- leader.setSnapshot(Optional.of(bs));
- leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
+ leader = new Leader(actorContext);
- //send first chunk and no InstallSnapshotReply received yet
- leader.getFollowerToSnapshot().getNextChunk();
- leader.getFollowerToSnapshot().incrementChunkIndex();
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- leader.handleMessage(leaderActor, new SendHeartBeat());
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex, term, (short) 0));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+ followerActor.underlyingActor().clear();
- AppendEntriesMessages.AppendEntries aeproto = (AppendEntriesMessages.AppendEntries)MessageCollectorActor.getFirstMatching(
- followerActor, AppendEntries.SERIALIZABLE_CLASS);
+ RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1);
- assertNotNull("AppendEntries should be sent even if InstallSnapshotReply is not " +
- "received", aeproto);
+ // State should not change
+ assertTrue(raftBehavior instanceof Leader);
- AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
+ assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+ assertEquals("Entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
+ assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
+ assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex());
+ }
- assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
+ @Test
+ public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception {
+ logStart("testHandleReplicateMessageSendAppendEntriesToFollower");
- //InstallSnapshotReply received
- leader.getFollowerToSnapshot().markSendStatus(true);
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public FiniteDuration getHeartBeatInterval() {
+ return FiniteDuration.apply(5, TimeUnit.SECONDS);
+ }
+ });
- leader.handleMessage(senderActor, new SendHeartBeat());
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
- InstallSnapshotMessages.InstallSnapshot isproto = (InstallSnapshotMessages.InstallSnapshot)
- MessageCollectorActor.getFirstMatching(followerActor,
- InstallSnapshot.SERIALIZABLE_CLASS);
+ leader = new Leader(actorContext);
- assertNotNull("Installsnapshot should get called for sending the next chunk of snapshot",
- isproto);
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- InstallSnapshot is = (InstallSnapshot) SerializationUtils.fromSerializable(isproto);
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex, term, (short)0));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
- assertEquals(snapshotIndex, is.getLastIncludedIndex());
+ followerActor.underlyingActor().clear();
- }};
+ for(int i=0;i<5;i++) {
+ sendReplicate(actorContext, lastIndex+i+1);
+ }
+
+ List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ // We expect only 1 message to be sent because of two reasons,
+ // - an append entries reply was not received
+ // - the heartbeat interval has not expired
+ // In this scenario if multiple messages are sent they would likely be duplicates
+ assertEquals("The number of append entries collected should be 1", 1, allMessages.size());
}
@Test
- public void testSendAppendEntriesSnapshotScenario() {
- new JavaTestKit(getSystem()) {{
+ public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception {
+ logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public FiniteDuration getHeartBeatInterval() {
+ return FiniteDuration.apply(5, TimeUnit.SECONDS);
+ }
+ });
- ActorRef followerActor = getTestActor();
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(followerActor.path().toString(),
- followerActor.path().toString());
+ leader = new Leader(actorContext);
- MockRaftActorContext actorContext =
- (MockRaftActorContext) createActorContext(getRef());
- actorContext.setPeerAddresses(peerAddresses);
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- Map<String, String> leadersSnapshot = new HashMap<>();
- leadersSnapshot.put("1", "A");
- leadersSnapshot.put("2", "B");
- leadersSnapshot.put("3", "C");
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex, term, (short)0));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
- //clears leaders log
- actorContext.getReplicatedLog().removeFrom(0);
+ followerActor.underlyingActor().clear();
- final int followersLastIndex = 2;
- final int snapshotIndex = 3;
- final int newEntryIndex = 4;
- final int snapshotTerm = 1;
- final int currentTerm = 2;
+ for(int i=0;i<3;i++) {
+ sendReplicate(actorContext, lastIndex+i+1);
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0));
- // set the snapshot variables in replicatedlog
- actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
- actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
- actorContext.setCommitIndex(followersLastIndex);
+ }
- Leader leader = new Leader(actorContext);
+ for(int i=3;i<5;i++) {
+ sendReplicate(actorContext, lastIndex + i + 1);
+ }
- // new entry
- ReplicatedLogImplEntry entry =
- new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
- new MockRaftActorContext.MockPayload("D"));
-
- //update follower timestamp
- leader.markFollowerActive(followerActor.path().toString());
-
- // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
- RaftActorBehavior raftBehavior = leader.handleMessage(
- senderActor, new Replicate(null, "state-id", entry));
-
- assertTrue(raftBehavior instanceof Leader);
-
- // we might receive some heartbeat messages, so wait till we InitiateInstallSnapshot
- Boolean[] matches = new ReceiveWhile<Boolean>(Boolean.class, duration("2 seconds")) {
- @Override
- protected Boolean match(Object o) throws Exception {
- if (o instanceof InitiateInstallSnapshot) {
- return true;
- }
- return false;
- }
- }.get();
-
- boolean initiateInitiateInstallSnapshot = false;
- for (Boolean b: matches) {
- initiateInitiateInstallSnapshot = b | initiateInitiateInstallSnapshot;
- }
+ List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would
+ // get sent to the follower - but not the 5th
+ assertEquals("The number of append entries collected should be 4", 4, allMessages.size());
- assertTrue(initiateInitiateInstallSnapshot);
- }};
+ for(int i=0;i<4;i++) {
+ long expected = allMessages.get(i).getEntries().get(0).getIndex();
+ assertEquals(expected, i+2);
+ }
}
@Test
- public void testInitiateInstallSnapshot() throws Exception {
- new JavaTestKit(getSystem()) {{
+ public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception {
+ logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat");
- ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public FiniteDuration getHeartBeatInterval() {
+ return FiniteDuration.apply(500, TimeUnit.MILLISECONDS);
+ }
+ });
- ActorRef followerActor = getTestActor();
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(followerActor.path().toString(),
- followerActor.path().toString());
+ leader = new Leader(actorContext);
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- MockRaftActorContext actorContext =
- (MockRaftActorContext) createActorContext(leaderActor);
- actorContext.setPeerAddresses(peerAddresses);
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex, term, (short)0));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
- Map<String, String> leadersSnapshot = new HashMap<>();
- leadersSnapshot.put("1", "A");
- leadersSnapshot.put("2", "B");
- leadersSnapshot.put("3", "C");
+ followerActor.underlyingActor().clear();
- //clears leaders log
- actorContext.getReplicatedLog().removeFrom(0);
+ sendReplicate(actorContext, lastIndex+1);
- final int followersLastIndex = 2;
- final int snapshotIndex = 3;
- final int newEntryIndex = 4;
- final int snapshotTerm = 1;
- final int currentTerm = 2;
+ // Wait slightly longer than heartbeat duration
+ Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
- // set the snapshot variables in replicatedlog
- actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
- actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
- actorContext.setLastApplied(3);
- actorContext.setCommitIndex(followersLastIndex);
+ leader.handleMessage(leaderActor, new SendHeartBeat());
- Leader leader = new Leader(actorContext);
- // set the snapshot as absent and check if capture-snapshot is invoked.
- leader.setSnapshot(Optional.<ByteString>absent());
+ List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
- // new entry
- ReplicatedLogImplEntry entry =
- new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
- new MockRaftActorContext.MockPayload("D"));
+ assertEquals(1, allMessages.get(0).getEntries().size());
+ assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
+ assertEquals(1, allMessages.get(1).getEntries().size());
+ assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex());
+
+ }
- actorContext.getReplicatedLog().append(entry);
+ @Test
+ public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception {
+ logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public FiniteDuration getHeartBeatInterval() {
+ return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
+ }
+ });
- // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
- RaftActorBehavior raftBehavior = leader.handleMessage(
- leaderActor, new InitiateInstallSnapshot());
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
- CaptureSnapshot cs = (CaptureSnapshot) MessageCollectorActor.
- getFirstMatching(leaderActor, CaptureSnapshot.class);
+ leader = new Leader(actorContext);
- assertNotNull(cs);
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- assertTrue(cs.isInstallSnapshotInitiated());
- assertEquals(3, cs.getLastAppliedIndex());
- assertEquals(1, cs.getLastAppliedTerm());
- assertEquals(4, cs.getLastIndex());
- assertEquals(2, cs.getLastTerm());
- }};
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex, term, (short)0));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
+
+ followerActor.underlyingActor().clear();
+
+ for(int i=0;i<3;i++) {
+ Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+ }
+
+ List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ assertEquals("The number of append entries collected should be 3", 3, allMessages.size());
}
@Test
- public void testInstallSnapshot() {
- new JavaTestKit(getSystem()) {{
-
- ActorRef followerActor = getTestActor();
+ public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception {
+ logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate");
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(followerActor.path().toString(),
- followerActor.path().toString());
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public FiniteDuration getHeartBeatInterval() {
+ return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
+ }
+ });
- MockRaftActorContext actorContext =
- (MockRaftActorContext) createActorContext();
- actorContext.setPeerAddresses(peerAddresses);
+ long term = 1;
+ actorContext.getTermInformation().update(term, "");
+ leader = new Leader(actorContext);
- Map<String, String> leadersSnapshot = new HashMap<>();
- leadersSnapshot.put("1", "A");
- leadersSnapshot.put("2", "B");
- leadersSnapshot.put("3", "C");
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- //clears leaders log
- actorContext.getReplicatedLog().removeFrom(0);
+ // The follower would normally reply - simulate that explicitly here.
+ long lastIndex = actorContext.getReplicatedLog().lastIndex();
+ leader.handleMessage(followerActor, new AppendEntriesReply(
+ FOLLOWER_ID, term, true, lastIndex, term, (short)0));
+ assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive());
- final int followersLastIndex = 2;
- final int snapshotIndex = 3;
- final int newEntryIndex = 4;
- final int snapshotTerm = 1;
- final int currentTerm = 2;
+ followerActor.underlyingActor().clear();
- // set the snapshot variables in replicatedlog
- actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
- actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
- actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
- actorContext.setCommitIndex(followersLastIndex);
+ Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS);
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+ sendReplicate(actorContext, lastIndex+1);
- Leader leader = new Leader(actorContext);
+ List<AppendEntries> allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class);
+ assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
- // new entry
- ReplicatedLogImplEntry entry =
- new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
- new MockRaftActorContext.MockPayload("D"));
-
- RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
- new SendInstallSnapshot(toByteString(leadersSnapshot)));
-
- assertTrue(raftBehavior instanceof Leader);
-
- // check if installsnapshot gets called with the correct values.
- final String out =
- new ExpectMsg<String>(duration("1 seconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- protected String match(Object in) {
- if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
- InstallSnapshot is = (InstallSnapshot)
- SerializationUtils.fromSerializable(in);
- if (is.getData() == null) {
- return "InstallSnapshot data is null";
- }
- if (is.getLastIncludedIndex() != snapshotIndex) {
- return is.getLastIncludedIndex() + "!=" + snapshotIndex;
- }
- if (is.getLastIncludedTerm() != snapshotTerm) {
- return is.getLastIncludedTerm() + "!=" + snapshotTerm;
- }
- if (is.getTerm() == currentTerm) {
- return is.getTerm() + "!=" + currentTerm;
- }
-
- return "match";
-
- } else {
- return "message mismatch:" + in.getClass();
- }
- }
- }.get(); // this extracts the received message
-
- assertEquals("match", out);
- }};
+ assertEquals(0, allMessages.get(0).getEntries().size());
+ assertEquals(1, allMessages.get(1).getEntries().size());
}
+
@Test
- public void testHandleInstallSnapshotReplyLastChunk() {
- new JavaTestKit(getSystem()) {{
+ public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception {
+ logStart("testHandleReplicateMessageWhenThereAreNoFollowers");
- ActorRef followerActor = getTestActor();
+ MockRaftActorContext actorContext = createActorContext();
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(followerActor.path().toString(),
- followerActor.path().toString());
-
- final int followersLastIndex = 2;
- final int snapshotIndex = 3;
- final int newEntryIndex = 4;
- final int snapshotTerm = 1;
- final int currentTerm = 2;
-
- MockRaftActorContext actorContext =
- (MockRaftActorContext) createActorContext();
- actorContext.setPeerAddresses(peerAddresses);
- actorContext.setCommitIndex(followersLastIndex);
-
- MockLeader leader = new MockLeader(actorContext);
-
- Map<String, String> leadersSnapshot = new HashMap<>();
- leadersSnapshot.put("1", "A");
- leadersSnapshot.put("2", "B");
- leadersSnapshot.put("3", "C");
-
- // set the snapshot variables in replicatedlog
-
- actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
- actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
- actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
-
- ByteString bs = toByteString(leadersSnapshot);
- leader.setSnapshot(Optional.of(bs));
- leader.createFollowerToSnapshot(followerActor.path().toString(), bs);
- while(!leader.getFollowerToSnapshot().isLastChunk(leader.getFollowerToSnapshot().getChunkIndex())) {
- leader.getFollowerToSnapshot().getNextChunk();
- leader.getFollowerToSnapshot().incrementChunkIndex();
- }
+ leader = new Leader(actorContext);
- //clears leaders log
- actorContext.getReplicatedLog().removeFrom(0);
+ actorContext.setLastApplied(0);
- RaftActorBehavior raftBehavior = leader.handleMessage(senderActor,
- new InstallSnapshotReply(currentTerm, followerActor.path().toString(),
- leader.getFollowerToSnapshot().getChunkIndex(), true));
+ long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
+ long term = actorContext.getTermInformation().getCurrentTerm();
+ MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry(
+ term, newLogIndex, new MockRaftActorContext.MockPayload("foo"));
- assertTrue(raftBehavior instanceof Leader);
+ actorContext.getReplicatedLog().append(newEntry);
- assertEquals(leader.mapFollowerToSnapshot.size(), 0);
- assertEquals(leader.followerToLog.size(), 1);
- assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
- FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
- assertEquals(snapshotIndex, fli.getMatchIndex().get());
- assertEquals(snapshotIndex, fli.getMatchIndex().get());
- assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
- }};
- }
+ RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
+ new Replicate(leaderActor, "state-id", newEntry));
- @Test
- public void testFollowerToSnapshotLogic() {
+ // State should not change
+ assertTrue(raftBehavior instanceof Leader);
- MockRaftActorContext actorContext = (MockRaftActorContext) createActorContext();
+ assertEquals("getCommitIndex", newLogIndex, actorContext.getCommitIndex());
- actorContext.setConfigParams(new DefaultConfigParamsImpl() {
- @Override
- public int getSnapshotChunkSize() {
- return 50;
- }
- });
+ // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
+ // one since lastApplied state is 0.
+ List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
+ leaderActor, ApplyState.class);
+ assertEquals("ApplyState count", newLogIndex, applyStateList.size());
+
+ for(int i = 0; i <= newLogIndex - 1; i++ ) {
+ ApplyState applyState = applyStateList.get(i);
+ assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
+ assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
+ }
+
+ ApplyState last = applyStateList.get((int) newLogIndex - 1);
+ assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
+ assertEquals("getIdentifier", "state-id", last.getIdentifier());
+ }
+
+ @Test
+ public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception {
+ logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot");
- MockLeader leader = new MockLeader(actorContext);
+ MockRaftActorContext actorContext = createActorContextWithFollower();
Map<String, String> leadersSnapshot = new HashMap<>();
leadersSnapshot.put("1", "A");
leadersSnapshot.put("2", "B");
leadersSnapshot.put("3", "C");
- ByteString bs = toByteString(leadersSnapshot);
- byte[] barray = bs.toByteArray();
+ //clears leaders log
+ actorContext.getReplicatedLog().removeFrom(0);
- leader.createFollowerToSnapshot("followerId", bs);
- assertEquals(bs.size(), barray.length);
+ final int commitIndex = 3;
+ final int snapshotIndex = 2;
+ final int newEntryIndex = 4;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
- int chunkIndex=0;
- for (int i=0; i < barray.length; i = i + 50) {
- int j = i + 50;
- chunkIndex++;
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.setCommitIndex(commitIndex);
+ //set follower timeout to 2 mins, helps during debugging
+ actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10));
- if (i + 50 > barray.length) {
- j = barray.length;
- }
+ leader = new Leader(actorContext);
- ByteString chunk = leader.getFollowerToSnapshot().getNextChunk();
- assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
- assertEquals("chunkindex not matching", chunkIndex, leader.getFollowerToSnapshot().getChunkIndex());
+ leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+ leader.getFollower(FOLLOWER_ID).setNextIndex(0);
- leader.getFollowerToSnapshot().markSendStatus(true);
- if (!leader.getFollowerToSnapshot().isLastChunk(chunkIndex)) {
- leader.getFollowerToSnapshot().incrementChunkIndex();
- }
- }
+ // new entry
+ ReplicatedLogImplEntry entry =
+ new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+ new MockRaftActorContext.MockPayload("D"));
- assertEquals("totalChunks not matching", chunkIndex, leader.getFollowerToSnapshot().getTotalChunks());
- }
+ //update follower timestamp
+ leader.markFollowerActive(FOLLOWER_ID);
+ ByteString bs = toByteString(leadersSnapshot);
+ leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
+ commitIndex, snapshotTerm, commitIndex, snapshotTerm));
+ FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
+ leader.setFollowerSnapshot(FOLLOWER_ID, fts);
- @Override protected RaftActorBehavior createBehavior(
- RaftActorContext actorContext) {
- return new Leader(actorContext);
- }
+ //send first chunk and no InstallSnapshotReply received yet
+ fts.getNextChunk();
+ fts.incrementChunkIndex();
- @Override protected RaftActorContext createActorContext() {
- return createActorContext(leaderActor);
- }
+ Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
- protected RaftActorContext createActorContext(ActorRef actorRef) {
- return new MockRaftActorContext("test", getSystem(), actorRef);
- }
+ leader.handleMessage(leaderActor, new SendHeartBeat());
- private ByteString toByteString(Map<String, String> state) {
- ByteArrayOutputStream b = null;
- ObjectOutputStream o = null;
- try {
- try {
- b = new ByteArrayOutputStream();
- o = new ObjectOutputStream(b);
- o.writeObject(state);
- byte[] snapshotBytes = b.toByteArray();
- return ByteString.copyFrom(snapshotBytes);
- } finally {
- if (o != null) {
- o.flush();
- o.close();
- }
- if (b != null) {
- b.close();
- }
- }
- } catch (IOException e) {
- Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
- }
- return null;
- }
+ AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- public static class ForwardMessageToBehaviorActor extends MessageCollectorActor {
- private static AbstractRaftActorBehavior behavior;
+ AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto);
- public ForwardMessageToBehaviorActor(){
+ assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty());
- }
+ //InstallSnapshotReply received
+ fts.markSendStatus(true);
- @Override public void onReceive(Object message) throws Exception {
- super.onReceive(message);
- behavior.handleMessage(sender(), message);
- }
+ leader.handleMessage(leaderActor, new SendHeartBeat());
- public static void setBehavior(AbstractRaftActorBehavior behavior){
- ForwardMessageToBehaviorActor.behavior = behavior;
- }
+ InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
+
+ assertEquals(commitIndex, is.getLastIncludedIndex());
}
@Test
- public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
- new JavaTestKit(getSystem()) {{
+ public void testSendAppendEntriesSnapshotScenario() throws Exception {
+ logStart("testSendAppendEntriesSnapshotScenario");
- ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+ MockRaftActorContext actorContext = createActorContextWithFollower();
- MockRaftActorContext leaderActorContext =
- new MockRaftActorContext("leader", getSystem(), leaderActor);
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
- ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
+ //clears leaders log
+ actorContext.getReplicatedLog().removeFrom(0);
- MockRaftActorContext followerActorContext =
- new MockRaftActorContext("follower", getSystem(), followerActor);
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int newEntryIndex = 4;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
- Follower follower = new Follower(followerActorContext);
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.setCommitIndex(followersLastIndex);
- ForwardMessageToBehaviorActor.setBehavior(follower);
+ leader = new Leader(actorContext);
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(followerActor.path().toString(),
- followerActor.path().toString());
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- leaderActorContext.setPeerAddresses(peerAddresses);
+ // new entry
+ ReplicatedLogImplEntry entry =
+ new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+ new MockRaftActorContext.MockPayload("D"));
- leaderActorContext.getReplicatedLog().removeFrom(0);
+ actorContext.getReplicatedLog().append(entry);
- //create 3 entries
- leaderActorContext.setReplicatedLog(
- new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ //update follower timestamp
+ leader.markFollowerActive(FOLLOWER_ID);
- leaderActorContext.setCommitIndex(1);
+ // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
+ RaftActorBehavior raftBehavior = leader.handleMessage(
+ leaderActor, new Replicate(null, "state-id", entry));
- followerActorContext.getReplicatedLog().removeFrom(0);
+ assertTrue(raftBehavior instanceof Leader);
- // follower too has the exact same log entries and has the same commit index
- followerActorContext.setReplicatedLog(
- new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
+ }
- followerActorContext.setCommitIndex(1);
+ @Test
+ public void testInitiateInstallSnapshot() throws Exception {
+ logStart("testInitiateInstallSnapshot");
- Leader leader = new Leader(leaderActorContext);
- leader.markFollowerActive(followerActor.path().toString());
+ MockRaftActorContext actorContext = createActorContextWithFollower();
- leader.handleMessage(leaderActor, new SendHeartBeat());
+ //clears leaders log
+ actorContext.getReplicatedLog().removeFrom(0);
- AppendEntriesMessages.AppendEntries appendEntries =
- (AppendEntriesMessages.AppendEntries) MessageCollectorActor
- .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int newEntryIndex = 4;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
- assertNotNull(appendEntries);
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.setLastApplied(3);
+ actorContext.setCommitIndex(followersLastIndex);
- assertEquals(1, appendEntries.getLeaderCommit());
- assertEquals(1, appendEntries.getLogEntries(0).getIndex());
- assertEquals(0, appendEntries.getPrevLogIndex());
+ leader = new Leader(actorContext);
- AppendEntriesReply appendEntriesReply =
- (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
- leaderActor, AppendEntriesReply.class);
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- assertNotNull(appendEntriesReply);
+ // set the snapshot as absent and check if capture-snapshot is invoked.
+ leader.setSnapshot(null);
- // follower returns its next index
- assertEquals(2, appendEntriesReply.getLogLastIndex());
- assertEquals(1, appendEntriesReply.getLogLastTerm());
+ // new entry
+ ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+ new MockRaftActorContext.MockPayload("D"));
- }};
- }
+ actorContext.getReplicatedLog().append(entry);
+ //update follower timestamp
+ leader.markFollowerActive(FOLLOWER_ID);
- @Test
- public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
- new JavaTestKit(getSystem()) {{
+ leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
- ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+ assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
- MockRaftActorContext leaderActorContext =
- new MockRaftActorContext("leader", getSystem(), leaderActor);
+ CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
- ActorRef followerActor = getSystem().actorOf(
- Props.create(ForwardMessageToBehaviorActor.class));
+ assertTrue(cs.isInstallSnapshotInitiated());
+ assertEquals(3, cs.getLastAppliedIndex());
+ assertEquals(1, cs.getLastAppliedTerm());
+ assertEquals(4, cs.getLastIndex());
+ assertEquals(2, cs.getLastTerm());
- MockRaftActorContext followerActorContext =
- new MockRaftActorContext("follower", getSystem(), followerActor);
+ // if an initiate is started again when first is in progress, it shouldnt initiate Capture
+ leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
- Follower follower = new Follower(followerActorContext);
+ Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
+ }
- ForwardMessageToBehaviorActor.setBehavior(follower);
+ @Test
+ public void testInitiateForceInstallSnapshot() throws Exception {
+ logStart("testInitiateForceInstallSnapshot");
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put(followerActor.path().toString(),
- followerActor.path().toString());
+ MockRaftActorContext actorContext = createActorContextWithFollower();
- leaderActorContext.setPeerAddresses(peerAddresses);
+ final int followersLastIndex = 2;
+ final int snapshotIndex = -1;
+ final int newEntryIndex = 4;
+ final int snapshotTerm = -1;
+ final int currentTerm = 2;
- leaderActorContext.getReplicatedLog().removeFrom(0);
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.setLastApplied(3);
+ actorContext.setCommitIndex(followersLastIndex);
- leaderActorContext.setReplicatedLog(
- new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ actorContext.getReplicatedLog().removeFrom(0);
- leaderActorContext.setCommitIndex(1);
+ leader = new Leader(actorContext);
- followerActorContext.getReplicatedLog().removeFrom(0);
+ // Leader will send an immediate heartbeat - ignore it.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- followerActorContext.setReplicatedLog(
- new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ // set the snapshot as absent and check if capture-snapshot is invoked.
+ leader.setSnapshot(null);
- // follower has the same log entries but its commit index > leaders commit index
- followerActorContext.setCommitIndex(2);
+ for(int i=0;i<4;i++) {
+ actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
+ new MockRaftActorContext.MockPayload("X" + i)));
+ }
- Leader leader = new Leader(leaderActorContext);
- leader.markFollowerActive(followerActor.path().toString());
+ // new entry
+ ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+ new MockRaftActorContext.MockPayload("D"));
- leader.handleMessage(leaderActor, new SendHeartBeat());
+ actorContext.getReplicatedLog().append(entry);
- AppendEntriesMessages.AppendEntries appendEntries =
- (AppendEntriesMessages.AppendEntries) MessageCollectorActor
- .getFirstMatching(followerActor, AppendEntriesMessages.AppendEntries.class);
+ //update follower timestamp
+ leader.markFollowerActive(FOLLOWER_ID);
- assertNotNull(appendEntries);
+ // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
+ // installed with a SendInstallSnapshot
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true));
- assertEquals(1, appendEntries.getLeaderCommit());
- assertEquals(1, appendEntries.getLogEntries(0).getIndex());
- assertEquals(0, appendEntries.getPrevLogIndex());
+ assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
- AppendEntriesReply appendEntriesReply =
- (AppendEntriesReply) MessageCollectorActor.getFirstMatching(
- leaderActor, AppendEntriesReply.class);
+ CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
- assertNotNull(appendEntriesReply);
+ assertTrue(cs.isInstallSnapshotInitiated());
+ assertEquals(3, cs.getLastAppliedIndex());
+ assertEquals(1, cs.getLastAppliedTerm());
+ assertEquals(4, cs.getLastIndex());
+ assertEquals(2, cs.getLastTerm());
- assertEquals(2, appendEntriesReply.getLogLastIndex());
- assertEquals(1, appendEntriesReply.getLogLastTerm());
+ // if an initiate is started again when first is in progress, it shouldnt initiate Capture
+ leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
- }};
+ Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
}
+
@Test
- public void testHandleAppendEntriesReplyFailure(){
- new JavaTestKit(getSystem()) {
- {
+ public void testInstallSnapshot() throws Exception {
+ logStart("testInstallSnapshot");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
- ActorRef leaderActor =
- getSystem().actorOf(Props.create(MessageCollectorActor.class));
+ //clears leaders log
+ actorContext.getReplicatedLog().removeFrom(0);
- ActorRef followerActor =
- getSystem().actorOf(Props.create(MessageCollectorActor.class));
+ final int lastAppliedIndex = 3;
+ final int snapshotIndex = 2;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+ actorContext.setCommitIndex(lastAppliedIndex);
+ actorContext.setLastApplied(lastAppliedIndex);
- MockRaftActorContext leaderActorContext =
- new MockRaftActorContext("leader", getSystem(), leaderActor);
+ leader = new Leader(actorContext);
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put("follower-1",
- followerActor.path().toString());
+ // Initial heartbeat.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- leaderActorContext.setPeerAddresses(peerAddresses);
+ leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+ leader.getFollower(FOLLOWER_ID).setNextIndex(0);
- Leader leader = new Leader(leaderActorContext);
+ Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
+ Collections.<ReplicatedLogEntry>emptyList(),
+ lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
- AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
+ RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
- RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
+ assertTrue(raftBehavior instanceof Leader);
- assertEquals(RaftState.Leader, raftActorBehavior.state());
+ // check if installsnapshot gets called with the correct values.
- }};
+ InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
+
+ assertNotNull(installSnapshot.getData());
+ assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
+ assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
+
+ assertEquals(currentTerm, installSnapshot.getTerm());
}
@Test
- public void testHandleAppendEntriesReplySuccess() throws Exception {
- new JavaTestKit(getSystem()) {
- {
+ public void testForceInstallSnapshot() throws Exception {
+ logStart("testForceInstallSnapshot");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ final int lastAppliedIndex = 3;
+ final int snapshotIndex = -1;
+ final int snapshotTerm = -1;
+ final int currentTerm = 2;
+
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+ actorContext.setCommitIndex(lastAppliedIndex);
+ actorContext.setLastApplied(lastAppliedIndex);
+
+ leader = new Leader(actorContext);
- ActorRef leaderActor =
- getSystem().actorOf(Props.create(MessageCollectorActor.class));
+ // Initial heartbeat.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- ActorRef followerActor =
- getSystem().actorOf(Props.create(MessageCollectorActor.class));
+ leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+ leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
+ Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
+ Collections.<ReplicatedLogEntry>emptyList(),
+ lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
- MockRaftActorContext leaderActorContext =
- new MockRaftActorContext("leader", getSystem(), leaderActor);
+ RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
- leaderActorContext.setReplicatedLog(
- new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ assertTrue(raftBehavior instanceof Leader);
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put("follower-1",
- followerActor.path().toString());
+ // check if installsnapshot gets called with the correct values.
+
+ InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
+
+ assertNotNull(installSnapshot.getData());
+ assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
+ assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
+
+ assertEquals(currentTerm, installSnapshot.getTerm());
+ }
- leaderActorContext.setPeerAddresses(peerAddresses);
- leaderActorContext.setCommitIndex(1);
- leaderActorContext.setLastApplied(1);
- leaderActorContext.getTermInformation().update(1, "leader");
+ @Test
+ public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
+ logStart("testHandleInstallSnapshotReplyLastChunk");
- Leader leader = new Leader(leaderActorContext);
+ MockRaftActorContext actorContext = createActorContextWithFollower();
- AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
+ final int commitIndex = 3;
+ final int snapshotIndex = 2;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
- RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
+ actorContext.setCommitIndex(commitIndex);
- assertEquals(RaftState.Leader, raftActorBehavior.state());
+ leader = new Leader(actorContext);
- assertEquals(2, leaderActorContext.getCommitIndex());
+ leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+ leader.getFollower(FOLLOWER_ID).setNextIndex(0);
- ApplyLogEntries applyLogEntries =
- (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
- ApplyLogEntries.class);
+ // Ignore initial heartbeat.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- assertNotNull(applyLogEntries);
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
- assertEquals(2, leaderActorContext.getLastApplied());
+ // set the snapshot variables in replicatedlog
- assertEquals(2, applyLogEntries.getToIndex());
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
- List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
- ApplyState.class);
+ ByteString bs = toByteString(leadersSnapshot);
+ leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
+ commitIndex, snapshotTerm, commitIndex, snapshotTerm));
+ FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
+ leader.setFollowerSnapshot(FOLLOWER_ID, fts);
+ while(!fts.isLastChunk(fts.getChunkIndex())) {
+ fts.getNextChunk();
+ fts.incrementChunkIndex();
+ }
- assertEquals(1,applyStateList.size());
+ //clears leaders log
+ actorContext.getReplicatedLog().removeFrom(0);
- ApplyState applyState = (ApplyState) applyStateList.get(0);
+ RaftActorBehavior raftBehavior = leader.handleMessage(followerActor,
+ new InstallSnapshotReply(currentTerm, FOLLOWER_ID, fts.getChunkIndex(), true));
- assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
+ assertTrue(raftBehavior instanceof Leader);
- }};
+ assertEquals(0, leader.followerSnapshotSize());
+ assertEquals(1, leader.followerLogSize());
+ FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID);
+ assertNotNull(fli);
+ assertEquals(commitIndex, fli.getMatchIndex());
+ assertEquals(commitIndex + 1, fli.getNextIndex());
}
@Test
- public void testHandleAppendEntriesReplyUnknownFollower(){
- new JavaTestKit(getSystem()) {
- {
+ public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
+ logStart("testSendSnapshotfromInstallSnapshotReply");
- ActorRef leaderActor =
- getSystem().actorOf(Props.create(MessageCollectorActor.class));
+ MockRaftActorContext actorContext = createActorContextWithFollower();
- MockRaftActorContext leaderActorContext =
- new MockRaftActorContext("leader", getSystem(), leaderActor);
+ final int commitIndex = 3;
+ final int snapshotIndex = 2;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
- Leader leader = new Leader(leaderActorContext);
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
+ @Override
+ public int getSnapshotChunkSize() {
+ return 50;
+ }
+ };
+ configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+ configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
- AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
+ actorContext.setConfigParams(configParams);
+ actorContext.setCommitIndex(commitIndex);
- RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
+ leader = new Leader(actorContext);
- assertEquals(RaftState.Leader, raftActorBehavior.state());
+ leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+ leader.getFollower(FOLLOWER_ID).setNextIndex(0);
- }};
- }
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
- @Test
- public void testHandleRequestVoteReply(){
- new JavaTestKit(getSystem()) {
- {
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
- ActorRef leaderActor =
- getSystem().actorOf(Props.create(MessageCollectorActor.class));
+ ByteString bs = toByteString(leadersSnapshot);
+ Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
+ commitIndex, snapshotTerm, commitIndex, snapshotTerm);
+ leader.setSnapshot(snapshot);
- MockRaftActorContext leaderActorContext =
- new MockRaftActorContext("leader", getSystem(), leaderActor);
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
- Leader leader = new Leader(leaderActorContext);
+ InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
- RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
+ assertEquals(1, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
- assertEquals(RaftState.Leader, raftActorBehavior.state());
+ followerActor.underlyingActor().clear();
+ leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+ FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
- raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
+ installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
- assertEquals(RaftState.Leader, raftActorBehavior.state());
- }};
- }
+ assertEquals(2, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
- @Test
- public void testIsolatedLeaderCheckNoFollowers() {
- new JavaTestKit(getSystem()) {{
- ActorRef leaderActor = getTestActor();
+ followerActor.underlyingActor().clear();
+ leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+ FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
- MockRaftActorContext leaderActorContext =
- new MockRaftActorContext("leader", getSystem(), leaderActor);
+ installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
- Map<String, String> peerAddresses = new HashMap<>();
- leaderActorContext.setPeerAddresses(peerAddresses);
+ // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
+ followerActor.underlyingActor().clear();
+ leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+ FOLLOWER_ID, installSnapshot.getChunkIndex(), true));
- Leader leader = new Leader(leaderActorContext);
- RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
- Assert.assertTrue(behavior instanceof Leader);
- }};
+ installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class);
+
+ Assert.assertNull(installSnapshot);
}
+
@Test
- public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
- new JavaTestKit(getSystem()) {{
+ public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
+ logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex");
- ActorRef followerActor1 = getTestActor();
- ActorRef followerActor2 = getTestActor();
+ MockRaftActorContext actorContext = createActorContextWithFollower();
- MockRaftActorContext leaderActorContext = (MockRaftActorContext) createActorContext();
+ final int commitIndex = 3;
+ final int snapshotIndex = 2;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
- Map<String, String> peerAddresses = new HashMap<>();
- peerAddresses.put("follower-1", followerActor1.path().toString());
- peerAddresses.put("follower-2", followerActor2.path().toString());
+ actorContext.setConfigParams(new DefaultConfigParamsImpl(){
+ @Override
+ public int getSnapshotChunkSize() {
+ return 50;
+ }
+ });
- leaderActorContext.setPeerAddresses(peerAddresses);
+ actorContext.setCommitIndex(commitIndex);
- Leader leader = new Leader(leaderActorContext);
- leader.stopIsolatedLeaderCheckSchedule();
+ leader = new Leader(actorContext);
- leader.markFollowerActive("follower-1");
- leader.markFollowerActive("follower-2");
- RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
- Assert.assertTrue("Behavior not instance of Leader when all followers are active",
- behavior instanceof Leader);
+ leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+ leader.getFollower(FOLLOWER_ID).setNextIndex(0);
- // kill 1 follower and verify if that got killed
- final JavaTestKit probe = new JavaTestKit(getSystem());
- probe.watch(followerActor1);
- followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
- final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
- assertEquals(termMsg1.getActor(), followerActor1);
-
- leader.markFollowerInActive("follower-1");
- leader.markFollowerActive("follower-2");
- behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
- Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
- behavior instanceof Leader);
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
- // kill 2nd follower and leader should change to Isolated leader
- followerActor2.tell(PoisonPill.getInstance(), null);
- probe.watch(followerActor2);
- followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
- final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
- assertEquals(termMsg2.getActor(), followerActor2);
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
- leader.markFollowerInActive("follower-2");
- behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
- Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
- behavior instanceof IsolatedLeader);
+ ByteString bs = toByteString(leadersSnapshot);
+ Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
+ commitIndex, snapshotTerm, commitIndex, snapshotTerm);
+ leader.setSnapshot(snapshot);
- }};
- }
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
- class MockLeader extends Leader {
+ InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
- FollowerToSnapshot fts;
+ assertEquals(1, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
- public MockLeader(RaftActorContext context){
- super(context);
- }
+ followerActor.underlyingActor().clear();
- public FollowerToSnapshot getFollowerToSnapshot() {
- return fts;
- }
+ leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+ FOLLOWER_ID, -1, false));
- public void createFollowerToSnapshot(String followerId, ByteString bs ) {
- fts = new FollowerToSnapshot(bs);
- mapFollowerToSnapshot.put(followerId, fts);
+ Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
- }
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
+ installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
+
+ assertEquals(1, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
+ }
+
+ @Test
+ public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception {
+ logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk");
+
+ MockRaftActorContext actorContext = createActorContextWithFollower();
+
+ final int commitIndex = 3;
+ final int snapshotIndex = 2;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
+
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public int getSnapshotChunkSize() {
+ return 50;
+ }
+ });
+
+ actorContext.setCommitIndex(commitIndex);
+
+ leader = new Leader(actorContext);
+
+ leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+ leader.getFollower(FOLLOWER_ID).setNextIndex(0);
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+ ByteString bs = toByteString(leadersSnapshot);
+ Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
+ commitIndex, snapshotTerm, commitIndex, snapshotTerm);
+ leader.setSnapshot(snapshot);
+
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+
+ InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
+
+ assertEquals(1, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
+ assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue());
+
+ int hashCode = installSnapshot.getData().hashCode();
+
+ followerActor.underlyingActor().clear();
+
+ leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),
+ FOLLOWER_ID, 1, true));
+
+ installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
+
+ assertEquals(2, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
+ assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue());
+ }
+
+ @Test
+ public void testFollowerToSnapshotLogic() {
+ logStart("testFollowerToSnapshotLogic");
+
+ MockRaftActorContext actorContext = createActorContext();
+
+ actorContext.setConfigParams(new DefaultConfigParamsImpl() {
+ @Override
+ public int getSnapshotChunkSize() {
+ return 50;
+ }
+ });
+
+ leader = new Leader(actorContext);
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ ByteString bs = toByteString(leadersSnapshot);
+ byte[] barray = bs.toByteArray();
+
+ FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs);
+ leader.setFollowerSnapshot(FOLLOWER_ID, fts);
+
+ assertEquals(bs.size(), barray.length);
+
+ int chunkIndex=0;
+ for (int i=0; i < barray.length; i = i + 50) {
+ int j = i + 50;
+ chunkIndex++;
+
+ if (i + 50 > barray.length) {
+ j = barray.length;
+ }
+
+ ByteString chunk = fts.getNextChunk();
+ assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size());
+ assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex());
+
+ fts.markSendStatus(true);
+ if (!fts.isLastChunk(chunkIndex)) {
+ fts.incrementChunkIndex();
+ }
+ }
+
+ assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
+ }
+
+ @Override protected RaftActorBehavior createBehavior(
+ RaftActorContext actorContext) {
+ return new Leader(actorContext);
+ }
+
+ @Override
+ protected MockRaftActorContext createActorContext() {
+ return createActorContext(leaderActor);
+ }
+
+ @Override
+ protected MockRaftActorContext createActorContext(ActorRef actorRef) {
+ return createActorContext(LEADER_ID, actorRef);
+ }
+
+ private MockRaftActorContext createActorContextWithFollower() {
+ MockRaftActorContext actorContext = createActorContext();
+ actorContext.setPeerAddresses(ImmutableMap.<String, String>builder().put(FOLLOWER_ID,
+ followerActor.path().toString()).build());
+ return actorContext;
+ }
+
+ private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
+ configParams.setElectionTimeoutFactor(100000);
+ MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef);
+ context.setConfigParams(configParams);
+ context.setPayloadVersion(payloadVersion);
+ return context;
+ }
+
+ private MockRaftActorContext createFollowerActorContextWithLeader() {
+ MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
+ DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl();
+ followerConfig.setElectionTimeoutFactor(10000);
+ followerActorContext.setConfigParams(followerConfig);
+ followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
+ return followerActorContext;
+ }
+
+ @Test
+ public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception {
+ logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+
+ MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
+
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().setBehavior(follower);
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
+
+ leaderActorContext.setPeerAddresses(peerAddresses);
+
+ leaderActorContext.getReplicatedLog().removeFrom(0);
+
+ //create 3 entries
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+ leaderActorContext.setCommitIndex(1);
+
+ followerActorContext.getReplicatedLog().removeFrom(0);
+
+ // follower too has the exact same log entries and has the same commit index
+ followerActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+ followerActorContext.setCommitIndex(1);
+
+ leader = new Leader(leaderActorContext);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ assertEquals(1, appendEntries.getLeaderCommit());
+ assertEquals(0, appendEntries.getEntries().size());
+ assertEquals(0, appendEntries.getPrevLogIndex());
+
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
+ leaderActor, AppendEntriesReply.class);
+
+ assertEquals(2, appendEntriesReply.getLogLastIndex());
+ assertEquals(1, appendEntriesReply.getLogLastTerm());
+
+ // follower returns its next index
+ assertEquals(2, appendEntriesReply.getLogLastIndex());
+ assertEquals(1, appendEntriesReply.getLogLastTerm());
+
+ follower.close();
+ }
+
+ @Test
+ public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception {
+ logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex");
+
+ MockRaftActorContext leaderActorContext = createActorContext();
+
+ MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor);
+ followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString()));
+
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().setBehavior(follower);
+
+ Map<String, String> leaderPeerAddresses = new HashMap<>();
+ leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString());
+
+ leaderActorContext.setPeerAddresses(leaderPeerAddresses);
+
+ leaderActorContext.getReplicatedLog().removeFrom(0);
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+ leaderActorContext.setCommitIndex(1);
+
+ followerActorContext.getReplicatedLog().removeFrom(0);
+
+ followerActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+ // follower has the same log entries but its commit index > leaders commit index
+ followerActorContext.setCommitIndex(2);
+
+ leader = new Leader(leaderActorContext);
+
+ // Initial heartbeat
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ assertEquals(1, appendEntries.getLeaderCommit());
+ assertEquals(0, appendEntries.getEntries().size());
+ assertEquals(0, appendEntries.getPrevLogIndex());
+
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(
+ leaderActor, AppendEntriesReply.class);
+
+ assertEquals(2, appendEntriesReply.getLogLastIndex());
+ assertEquals(1, appendEntriesReply.getLogLastTerm());
+
+ leaderActor.underlyingActor().setBehavior(follower);
+ leader.handleMessage(followerActor, appendEntriesReply);
+
+ leaderActor.underlyingActor().clear();
+ followerActor.underlyingActor().clear();
+
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ assertEquals(2, appendEntries.getLeaderCommit());
+ assertEquals(0, appendEntries.getEntries().size());
+ assertEquals(2, appendEntries.getPrevLogIndex());
+
+ appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ assertEquals(2, appendEntriesReply.getLogLastIndex());
+ assertEquals(1, appendEntriesReply.getLogLastTerm());
+
+ assertEquals(2, followerActorContext.getCommitIndex());
+
+ follower.close();
+ }
+
+ @Test
+ public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){
+ logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+ long leaderCommitIndex = 2;
+ leaderActorContext.setCommitIndex(leaderCommitIndex);
+ leaderActorContext.setLastApplied(leaderCommitIndex);
+
+ ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+ ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
+
+ MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+
+ followerActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
+ followerActorContext.setCommitIndex(0);
+ followerActorContext.setLastApplied(0);
+
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().setBehavior(follower);
+
+ leader = new Leader(leaderActorContext);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ MessageCollectorActor.clearMessages(followerActor);
+ MessageCollectorActor.clearMessages(leaderActor);
+
+ // Verify initial AppendEntries sent with the leader's current commit index.
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("Log entries size", 0, appendEntries.getEntries().size());
+ assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
+
+ leaderActor.underlyingActor().setBehavior(leader);
+
+ leader.handleMessage(followerActor, appendEntriesReply);
+
+ MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+ assertEquals("Log entries size", 2, appendEntries.getEntries().size());
+
+ assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("First entry data", leadersSecondLogEntry.getData(),
+ appendEntries.getEntries().get(0).getData());
+ assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
+ assertEquals("Second entry data", leadersThirdLogEntry.getData(),
+ appendEntries.getEntries().get(1).getData());
+
+ FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+ assertEquals("getNextIndex", 3, followerInfo.getNextIndex());
+
+ List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
+
+ ApplyState applyState = applyStateList.get(0);
+ assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
+ assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+ assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
+ applyState.getReplicatedLogEntry().getData());
+
+ applyState = applyStateList.get(1);
+ assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
+ assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+ assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
+ applyState.getReplicatedLogEntry().getData());
+
+ assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex());
+ assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex());
+ }
+
+ @Test
+ public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() {
+ logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build());
+ long leaderCommitIndex = 1;
+ leaderActorContext.setCommitIndex(leaderCommitIndex);
+ leaderActorContext.setLastApplied(leaderCommitIndex);
+
+ ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
+ ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+
+ MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+
+ followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+ followerActorContext.setCommitIndex(-1);
+ followerActorContext.setLastApplied(-1);
+
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().setBehavior(follower);
+
+ leader = new Leader(leaderActorContext);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ MessageCollectorActor.clearMessages(followerActor);
+ MessageCollectorActor.clearMessages(leaderActor);
+
+ // Verify initial AppendEntries sent with the leader's current commit index.
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("Log entries size", 0, appendEntries.getEntries().size());
+ assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
+
+ leaderActor.underlyingActor().setBehavior(leader);
+
+ leader.handleMessage(followerActor, appendEntriesReply);
+
+ MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+ assertEquals("Log entries size", 2, appendEntries.getEntries().size());
+
+ assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("First entry data", leadersFirstLogEntry.getData(),
+ appendEntries.getEntries().get(0).getData());
+ assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
+ assertEquals("Second entry data", leadersSecondLogEntry.getData(),
+ appendEntries.getEntries().get(1).getData());
+
+ FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+ assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
+
+ List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
+
+ ApplyState applyState = applyStateList.get(0);
+ assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
+ assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+ assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
+ applyState.getReplicatedLogEntry().getData());
+
+ applyState = applyStateList.get(1);
+ assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
+ assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+ assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
+ applyState.getReplicatedLogEntry().getData());
+
+ assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
+ assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
+ }
+
+ @Test
+ public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){
+ logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
+ long leaderCommitIndex = 1;
+ leaderActorContext.setCommitIndex(leaderCommitIndex);
+ leaderActorContext.setLastApplied(leaderCommitIndex);
+
+ ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
+ ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+
+ MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+
+ followerActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build());
+ followerActorContext.setCommitIndex(-1);
+ followerActorContext.setLastApplied(-1);
+
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().setBehavior(follower);
+
+ leader = new Leader(leaderActorContext);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ MessageCollectorActor.clearMessages(followerActor);
+ MessageCollectorActor.clearMessages(leaderActor);
+
+ // Verify initial AppendEntries sent with the leader's current commit index.
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("Log entries size", 0, appendEntries.getEntries().size());
+ assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
+
+ leaderActor.underlyingActor().setBehavior(leader);
+
+ leader.handleMessage(followerActor, appendEntriesReply);
+
+ MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1);
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+ assertEquals("Log entries size", 2, appendEntries.getEntries().size());
+
+ assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
+ assertEquals("First entry data", leadersFirstLogEntry.getData(),
+ appendEntries.getEntries().get(0).getData());
+ assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
+ assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
+ assertEquals("Second entry data", leadersSecondLogEntry.getData(),
+ appendEntries.getEntries().get(1).getData());
+
+ FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+ assertEquals("getNextIndex", 2, followerInfo.getNextIndex());
+
+ List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
+
+ ApplyState applyState = applyStateList.get(0);
+ assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
+ assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
+ assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
+ applyState.getReplicatedLogEntry().getData());
+
+ applyState = applyStateList.get(1);
+ assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
+ assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
+ assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
+ applyState.getReplicatedLogEntry().getData());
+
+ assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex());
+ assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex());
+ assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm());
+ }
+
+ @Test
+ public void testHandleAppendEntriesReplyWithNewerTerm(){
+ logStart("testHandleAppendEntriesReplyWithNewerTerm");
+
+ MockRaftActorContext leaderActorContext = createActorContext();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(10000, TimeUnit.SECONDS));
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
+
+ leader = new Leader(leaderActorContext);
+ leaderActor.underlyingActor().setBehavior(leader);
+ leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
+
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ assertEquals(false, appendEntriesReply.isSuccess());
+ assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state());
+
+ MessageCollectorActor.clearMessages(leaderActor);
+ }
+
+ @Test
+ public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){
+ logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled");
+
+ MockRaftActorContext leaderActorContext = createActorContext();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(10000, TimeUnit.SECONDS));
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build());
+ leaderActorContext.setRaftPolicy(createRaftPolicy(false, false));
+
+ leader = new Leader(leaderActorContext);
+ leaderActor.underlyingActor().setBehavior(leader);
+ leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender());
+
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ assertEquals(false, appendEntriesReply.isSuccess());
+ assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state());
+
+ MessageCollectorActor.clearMessages(leaderActor);
+ }
+
+ @Test
+ public void testHandleAppendEntriesReplySuccess() throws Exception {
+ logStart("testHandleAppendEntriesReplySuccess");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+ leaderActorContext.setCommitIndex(1);
+ leaderActorContext.setLastApplied(1);
+ leaderActorContext.getTermInformation().update(1, "leader");
+
+ leader = new Leader(leaderActorContext);
+
+ FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+
+ assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
+ assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
+
+ short payloadVersion = 5;
+ AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
+
+ RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
+
+ assertEquals(RaftState.Leader, raftActorBehavior.state());
+
+ assertEquals(2, leaderActorContext.getCommitIndex());
+
+ ApplyJournalEntries applyJournalEntries = MessageCollectorActor.expectFirstMatching(
+ leaderActor, ApplyJournalEntries.class);
+
+ assertEquals(2, leaderActorContext.getLastApplied());
+
+ assertEquals(2, applyJournalEntries.getToIndex());
+
+ List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
+ ApplyState.class);
+
+ assertEquals(1,applyStateList.size());
+
+ ApplyState applyState = applyStateList.get(0);
+
+ assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
+
+ assertEquals(2, followerInfo.getMatchIndex());
+ assertEquals(3, followerInfo.getNextIndex());
+ assertEquals(payloadVersion, followerInfo.getPayloadVersion());
+ assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion());
+ }
+
+ @Test
+ public void testHandleAppendEntriesReplyUnknownFollower(){
+ logStart("testHandleAppendEntriesReplyUnknownFollower");
+
+ MockRaftActorContext leaderActorContext = createActorContext();
+
+ leader = new Leader(leaderActorContext);
+
+ AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0);
+
+ RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
+
+ assertEquals(RaftState.Leader, raftActorBehavior.state());
+ }
+
+ @Test
+ public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() {
+ logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2);
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
+ long leaderCommitIndex = 3;
+ leaderActorContext.setCommitIndex(leaderCommitIndex);
+ leaderActorContext.setLastApplied(leaderCommitIndex);
+
+ ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0);
+ ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1);
+ ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2);
+ ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3);
+
+ MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader();
+
+ followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+ followerActorContext.setCommitIndex(-1);
+ followerActorContext.setLastApplied(-1);
+
+ Follower follower = new Follower(followerActorContext);
+ followerActor.underlyingActor().setBehavior(follower);
+
+ leader = new Leader(leaderActorContext);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ MessageCollectorActor.clearMessages(followerActor);
+ MessageCollectorActor.clearMessages(leaderActor);
+
+ // Verify initial AppendEntries sent with the leader's current commit index.
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("Log entries size", 0, appendEntries.getEntries().size());
+ assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
+
+ leaderActor.underlyingActor().setBehavior(leader);
+
+ leader.handleMessage(followerActor, appendEntriesReply);
+
+ List<AppendEntries> appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
+ MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2);
+
+ appendEntries = appendEntriesList.get(0);
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+ assertEquals("Log entries size", 2, appendEntries.getEntries().size());
+
+ assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("First entry data", leadersFirstLogEntry.getData(),
+ appendEntries.getEntries().get(0).getData());
+ assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
+ assertEquals("Second entry data", leadersSecondLogEntry.getData(),
+ appendEntries.getEntries().get(1).getData());
+
+ appendEntries = appendEntriesList.get(1);
+ assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+ assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+ assertEquals("Log entries size", 2, appendEntries.getEntries().size());
+
+ assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("First entry data", leadersThirdLogEntry.getData(),
+ appendEntries.getEntries().get(0).getData());
+ assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
+ assertEquals("Second entry data", leadersFourthLogEntry.getData(),
+ appendEntries.getEntries().get(1).getData());
+
+ FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
+ assertEquals("getNextIndex", 4, followerInfo.getNextIndex());
+
+ MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4);
+
+ assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex());
+ assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex());
+ }
+
+ @Test
+ public void testHandleRequestVoteReply(){
+ logStart("testHandleRequestVoteReply");
+
+ MockRaftActorContext leaderActorContext = createActorContext();
+
+ leader = new Leader(leaderActorContext);
+
+ // Should be a no-op.
+ RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(followerActor,
+ new RequestVoteReply(1, true));
+
+ assertEquals(RaftState.Leader, raftActorBehavior.state());
+
+ raftActorBehavior = leader.handleRequestVoteReply(followerActor, new RequestVoteReply(1, false));
+
+ assertEquals(RaftState.Leader, raftActorBehavior.state());
+ }
+
+ @Test
+ public void testIsolatedLeaderCheckNoFollowers() {
+ logStart("testIsolatedLeaderCheckNoFollowers");
+
+ MockRaftActorContext leaderActorContext = createActorContext();
+
+ leader = new Leader(leaderActorContext);
+ RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+ Assert.assertTrue(behavior instanceof Leader);
+ }
+
+ private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){
+ ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
+ ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
+
+ MockRaftActorContext leaderActorContext = createActorContext();
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put("follower-1", followerActor1.path().toString());
+ peerAddresses.put("follower-2", followerActor2.path().toString());
+
+ leaderActorContext.setPeerAddresses(peerAddresses);
+ leaderActorContext.setRaftPolicy(raftPolicy);
+
+ leader = new Leader(leaderActorContext);
+
+ leader.markFollowerActive("follower-1");
+ leader.markFollowerActive("follower-2");
+ RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+ Assert.assertTrue("Behavior not instance of Leader when all followers are active",
+ behavior instanceof Leader);
+
+ // kill 1 follower and verify if that got killed
+ final JavaTestKit probe = new JavaTestKit(getSystem());
+ probe.watch(followerActor1);
+ followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ final Terminated termMsg1 = probe.expectMsgClass(Terminated.class);
+ assertEquals(termMsg1.getActor(), followerActor1);
+
+ leader.markFollowerInActive("follower-1");
+ leader.markFollowerActive("follower-2");
+ behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+ Assert.assertTrue("Behavior not instance of Leader when majority of followers are active",
+ behavior instanceof Leader);
+
+ // kill 2nd follower and leader should change to Isolated leader
+ followerActor2.tell(PoisonPill.getInstance(), null);
+ probe.watch(followerActor2);
+ followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ final Terminated termMsg2 = probe.expectMsgClass(Terminated.class);
+ assertEquals(termMsg2.getActor(), followerActor2);
+
+ leader.markFollowerInActive("follower-2");
+ return leader.handleMessage(leaderActor, new IsolatedLeaderCheck());
+ }
+
+ @Test
+ public void testIsolatedLeaderCheckTwoFollowers() throws Exception {
+ logStart("testIsolatedLeaderCheckTwoFollowers");
+
+ RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE);
+
+ Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive",
+ behavior instanceof IsolatedLeader);
+ }
+
+ @Test
+ public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception {
+ logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled");
+
+ RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true));
+
+ Assert.assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled",
+ behavior instanceof Leader);
+ }
+
+ @Test
+ public void testLaggingFollowerStarvation() throws Exception {
+ logStart("testLaggingFollowerStarvation");
+ new JavaTestKit(getSystem()) {{
+ String leaderActorId = actorFactory.generateActorId("leader");
+ String follower1ActorId = actorFactory.generateActorId("follower");
+ String follower2ActorId = actorFactory.generateActorId("follower");
+
+ TestActorRef<ForwardMessageToBehaviorActor> leaderActor =
+ actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId);
+ ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId);
+ ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId);
+
+ MockRaftActorContext leaderActorContext =
+ new MockRaftActorContext(leaderActorId, getSystem(), leaderActor);
+
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS));
+ configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+
+ leaderActorContext.setConfigParams(configParams);
+
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build());
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put(follower1ActorId,
+ follower1Actor.path().toString());
+ peerAddresses.put(follower2ActorId,
+ follower2Actor.path().toString());
+
+ leaderActorContext.setPeerAddresses(peerAddresses);
+ leaderActorContext.getTermInformation().update(1, leaderActorId);
+
+ RaftActorBehavior leader = createBehavior(leaderActorContext);
+
+ leaderActor.underlyingActor().setBehavior(leader);
+
+ for(int i=1;i<6;i++) {
+ // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733)
+ RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0));
+ assertTrue(newBehavior == leader);
+ Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+ }
+
+ // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply
+ List<SendHeartBeat> heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class);
+
+ assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()),
+ heartbeats.size() > 1);
+
+ // Check if follower-2 got AppendEntries during this time and was not starved
+ List<AppendEntries> appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class);
+
+ assertTrue(String.format("%s append entries is less than expected", appendEntries.size()),
+ appendEntries.size() > 1);
+
+ }};
+ }
+
+ @Test
+ public void testReplicationConsensusWithNonVotingFollower() {
+ logStart("testReplicationConsensusWithNonVotingFollower");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+
+ leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+ String nonVotingFollowerId = "nonvoting-follower";
+ TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
+ Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId));
+
+ leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING);
+
+ leader = new Leader(leaderActorContext);
+
+ // Ignore initial heartbeats
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
+
+ MessageCollectorActor.clearMessages(followerActor);
+ MessageCollectorActor.clearMessages(nonVotingFollowerActor);
+ MessageCollectorActor.clearMessages(leaderActor);
+
+ // Send a Replicate message and wait for AppendEntries.
+ sendReplicate(leaderActorContext, 0);
+
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class);
+
+ // Send reply only from the voting follower and verify consensus via ApplyState.
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
+
+ MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
+
+ leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0));
+
+ MessageCollectorActor.clearMessages(followerActor);
+ MessageCollectorActor.clearMessages(nonVotingFollowerActor);
+ MessageCollectorActor.clearMessages(leaderActor);
+
+ // Send another Replicate message
+ sendReplicate(leaderActorContext, 1);
+
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
+ AppendEntries.class);
+ assertEquals("Log entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
+
+ // Send reply only from the non-voting follower and verify no consensus via no ApplyState.
+ leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
+
+ MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500);
+
+ // Send reply from the voting follower and verify consensus.
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
+
+ MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
+ }
+
+ @Test
+ public void testTransferLeadershipWithFollowerInSync() {
+ logStart("testTransferLeadershipWithFollowerInSync");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+ leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+ leader = new Leader(leaderActorContext);
+
+ // Initial heartbeat
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+ MessageCollectorActor.clearMessages(followerActor);
+
+ sendReplicate(leaderActorContext, 0);
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
+ MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class);
+ MessageCollectorActor.clearMessages(followerActor);
+
+ RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+ leader.transferLeadership(mockTransferCohort);
+
+ verify(mockTransferCohort, never()).transferComplete();
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
+
+ // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
+ MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2);
+
+ // Leader should force an election timeout
+ MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+
+ verify(mockTransferCohort).transferComplete();
+ }
+
+ @Test
+ public void testTransferLeadershipWithEmptyLog() {
+ logStart("testTransferLeadershipWithEmptyLog");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(1000, TimeUnit.SECONDS));
+ leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+ leader = new Leader(leaderActorContext);
+
+ // Initial heartbeat
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+ MessageCollectorActor.clearMessages(followerActor);
+
+ RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+ leader.transferLeadership(mockTransferCohort);
+
+ verify(mockTransferCohort, never()).transferComplete();
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+
+ // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ // Leader should force an election timeout
+ MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+
+ verify(mockTransferCohort).transferComplete();
+ }
+
+ @Test
+ public void testTransferLeadershipWithFollowerInitiallyOutOfSync() {
+ logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(200, TimeUnit.MILLISECONDS));
+
+ leader = new Leader(leaderActorContext);
+
+ // Initial heartbeat
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ MessageCollectorActor.clearMessages(followerActor);
+
+ RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+ leader.transferLeadership(mockTransferCohort);
+
+ verify(mockTransferCohort, never()).transferComplete();
+
+ // Sync up the follower.
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+ MessageCollectorActor.clearMessages(followerActor);
+
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
+ getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0));
+
+ // Leader should force an election timeout
+ MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class);
+
+ verify(mockTransferCohort).transferComplete();
+ }
+
+ @Test
+ public void testTransferLeadershipWithFollowerSyncTimeout() {
+ logStart("testTransferLeadershipWithFollowerSyncTimeout");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(200, TimeUnit.MILLISECONDS));
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2);
+ leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+
+ leader = new Leader(leaderActorContext);
+
+ // Initial heartbeat
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0));
+ MessageCollectorActor.clearMessages(followerActor);
+
+ sendReplicate(leaderActorContext, 0);
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+ MessageCollectorActor.clearMessages(followerActor);
+
+ RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
+ leader.transferLeadership(mockTransferCohort);
+
+ verify(mockTransferCohort, never()).transferComplete();
+
+ // Send heartbeats to time out the transfer.
+ for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) {
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().
+ getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS);
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+ }
+
+ verify(mockTransferCohort).abortTransfer();
+ verify(mockTransferCohort, never()).transferComplete();
+ MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100);
+ }
+
+ @Override
+ protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext,
+ ActorRef actorRef, RaftRPC rpc) throws Exception {
+ super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
+ assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
}
private class MockConfigParamsImpl extends DefaultConfigParamsImpl {
- private long electionTimeOutIntervalMillis;
- private int snapshotChunkSize;
+ private final long electionTimeOutIntervalMillis;
+ private final int snapshotChunkSize;
public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
super();