X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeaderTest.java;h=7ad5812f64250ccbe25a95d25e5e824d264b966f;hp=3f085df8dc3b858879981c27e223262f2a5bc40f;hb=6dbf8f82cfa9fe8c35e4085213a55cb887cc3aee;hpb=73dc2546de1386c977a9e4efd8e589e3db223cc9 diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 3f085df8dc..7ad5812f64 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -1,8 +1,24 @@ +/* + * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + package org.opendaylight.controller.cluster.raft.behaviors; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Props; @@ -11,43 +27,62 @@ import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; +import com.google.common.io.ByteSource; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang3.SerializationUtils; import org.junit.After; -import org.junit.Assert; import org.junit.Test; +import org.opendaylight.controller.cluster.messaging.MessageSlice; +import org.opendaylight.controller.cluster.messaging.MessageSliceReply; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; +import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohort; import org.opendaylight.controller.cluster.raft.RaftState; -import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; -import org.opendaylight.controller.cluster.raft.SerializationUtils; -import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; +import org.opendaylight.controller.cluster.raft.RaftVersions; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.VotingState; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; -import org.opendaylight.controller.cluster.raft.base.messages.IsolatedLeaderCheck; +import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; -import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; +import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; +import org.opendaylight.controller.cluster.raft.persisted.ByteState; +import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; +import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy; +import org.opendaylight.controller.cluster.raft.policy.RaftPolicy; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; +import org.opendaylight.yangtools.concepts.Identifier; import scala.concurrent.duration.FiniteDuration; -public class LeaderTest extends AbstractLeaderTest { +public class LeaderTest extends AbstractLeaderTest { static final String FOLLOWER_ID = "follower"; + public static final String LEADER_ID = "leader"; private final TestActorRef leaderActor = actorFactory.createTestActor( Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("leader")); @@ -56,11 +91,12 @@ public class LeaderTest extends AbstractLeaderTest { Props.create(ForwardMessageToBehaviorActor.class), actorFactory.generateActorId("follower")); private Leader leader; + private final short payloadVersion = 5; @Override @After - public void tearDown() throws Exception { - if(leader != null) { + public void tearDown() { + if (leader != null) { leader.close(); } @@ -73,10 +109,8 @@ public class LeaderTest extends AbstractLeaderTest { leader = new Leader(createActorContext()); - // handle message should return the Leader state when it receives an - // unknown message - RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo"); - Assert.assertTrue(behavior instanceof Leader); + // handle message should null when it receives an unknown message + assertNull(leader.handleMessage(followerActor, "foo")); } @Test @@ -84,32 +118,36 @@ public class LeaderTest extends AbstractLeaderTest { logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers"); MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setCommitIndex(-1); + actorContext.setPayloadVersion(payloadVersion); long term = 1; actorContext.getTermInformation().update(term, ""); leader = new Leader(actorContext); + actorContext.setCurrentBehavior(leader); // Leader should send an immediate heartbeat with no entries as follower is inactive. - long lastIndex = actorContext.getReplicatedLog().lastIndex(); + final long lastIndex = actorContext.getReplicatedLog().lastIndex(); AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); assertEquals("getTerm", term, appendEntries.getTerm()); assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); assertEquals("getPrevLogTerm", -1, appendEntries.getPrevLogTerm()); assertEquals("Entries size", 0, appendEntries.getEntries().size()); + assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion()); // The follower would normally reply - simulate that explicitly here. leader.handleMessage(followerActor, new AppendEntriesReply( - FOLLOWER_ID, term, true, lastIndex - 1, term)); + FOLLOWER_ID, term, true, lastIndex - 1, term, (short)0)); assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); followerActor.underlyingActor().clear(); // Sleep for the heartbeat interval so AppendEntries is sent. - Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams(). - getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); + Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams() + .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); - leader.handleMessage(leaderActor, new SendHeartBeat()); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex()); @@ -117,6 +155,22 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("Entries size", 1, appendEntries.getEntries().size()); assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex()); assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm()); + assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion()); + } + + + private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long index) { + return sendReplicate(actorContext, 1, index); + } + + private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) { + return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo")); + } + + private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index, Payload payload) { + SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload); + actorContext.getReplicatedLog().append(newEntry); + return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true)); } @Test @@ -136,17 +190,105 @@ public class LeaderTest extends AbstractLeaderTest { // The follower would normally reply - simulate that explicitly here. long lastIndex = actorContext.getReplicatedLog().lastIndex(); leader.handleMessage(followerActor, new AppendEntriesReply( - FOLLOWER_ID, term, true, lastIndex, term)); + FOLLOWER_ID, term, true, lastIndex, term, (short)0)); assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); followerActor.underlyingActor().clear(); - MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo"); - MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry( - 1, lastIndex + 1, payload); - actorContext.getReplicatedLog().append(newEntry); - RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, - new Replicate(null, null, newEntry)); + RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1); + + // State should not change + assertTrue(raftBehavior instanceof Leader); + + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex()); + assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm()); + assertEquals("Entries size", 1, appendEntries.getEntries().size()); + assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex()); + assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm()); + assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString()); + assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex()); + } + + @Test + public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception { + logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setCommitIndex(-1); + actorContext.setLastApplied(-1); + + // The raft context is initialized with a couple log entries. However the commitIndex + // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't + // committed and applied. Now it regains leadership with a higher term (2). + long prevTerm = actorContext.getTermInformation().getCurrentTerm(); + long newTerm = prevTerm + 1; + actorContext.getTermInformation().update(newTerm, ""); + + leader = new Leader(actorContext); + actorContext.setCurrentBehavior(leader); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // The follower replies with the leader's current last index and term, simulating that it is + // up to date with the leader. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, newTerm, true, lastIndex, prevTerm, (short)0)); + + // The commit index should not get updated even though consensus was reached. This is b/c the + // last entry's term does match the current term. As per §5.4.1, "Raft never commits log entries + // from previous terms by counting replicas". + assertEquals("Commit Index", -1, actorContext.getCommitIndex()); + + followerActor.underlyingActor().clear(); + + // Now replicate a new entry with the new term 2. + long newIndex = lastIndex + 1; + sendReplicate(actorContext, newTerm, newIndex); + + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex()); + assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm()); + assertEquals("Entries size", 1, appendEntries.getEntries().size()); + assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex()); + assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm()); + assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString()); + + // The follower replies with success. The leader should now update the commit index to the new index + // as per §5.4.1 "once an entry from the current term is committed by counting replicas, then all + // prior entries are committed indirectly". + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, newTerm, true, newIndex, newTerm, (short)0)); + + assertEquals("Commit Index", newIndex, actorContext.getCommitIndex()); + } + + @Test + public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception { + logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setRaftPolicy(createRaftPolicy(true, true)); + + long term = 1; + actorContext.getTermInformation().update(term, ""); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // The follower would normally reply - simulate that explicitly here. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex, term, (short) 0)); + assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); + + followerActor.underlyingActor().clear(); + + RaftActorBehavior raftBehavior = sendReplicate(actorContext, lastIndex + 1); // State should not change assertTrue(raftBehavior instanceof Leader); @@ -157,9 +299,222 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("Entries size", 1, appendEntries.getEntries().size()); assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex()); assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm()); - assertEquals("Entry payload", payload, appendEntries.getEntries().get(0).getData()); + assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString()); + assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex()); + } + + @Test + public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception { + logStart("testHandleReplicateMessageSendAppendEntriesToFollower"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setConfigParams(new DefaultConfigParamsImpl() { + @Override + public FiniteDuration getHeartBeatInterval() { + return FiniteDuration.apply(5, TimeUnit.SECONDS); + } + }); + + long term = 1; + actorContext.getTermInformation().update(term, ""); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // The follower would normally reply - simulate that explicitly here. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex, term, (short)0)); + assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); + + followerActor.underlyingActor().clear(); + + for (int i = 0; i < 5; i++) { + sendReplicate(actorContext, lastIndex + i + 1); + } + + List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + // We expect only 1 message to be sent because of two reasons, + // - an append entries reply was not received + // - the heartbeat interval has not expired + // In this scenario if multiple messages are sent they would likely be duplicates + assertEquals("The number of append entries collected should be 1", 1, allMessages.size()); + } + + @Test + public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception { + logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setConfigParams(new DefaultConfigParamsImpl() { + @Override + public FiniteDuration getHeartBeatInterval() { + return FiniteDuration.apply(5, TimeUnit.SECONDS); + } + }); + + long term = 1; + actorContext.getTermInformation().update(term, ""); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // The follower would normally reply - simulate that explicitly here. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex, term, (short)0)); + assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); + + followerActor.underlyingActor().clear(); + + for (int i = 0; i < 3; i++) { + sendReplicate(actorContext, lastIndex + i + 1); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0)); + + } + + for (int i = 3; i < 5; i++) { + sendReplicate(actorContext, lastIndex + i + 1); + } + + List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would + // get sent to the follower - but not the 5th + assertEquals("The number of append entries collected should be 4", 4, allMessages.size()); + + for (int i = 0; i < 4; i++) { + long expected = allMessages.get(i).getEntries().get(0).getIndex(); + assertEquals(expected, i + 2); + } + } + + @Test + public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception { + logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setConfigParams(new DefaultConfigParamsImpl() { + @Override + public FiniteDuration getHeartBeatInterval() { + return FiniteDuration.apply(500, TimeUnit.MILLISECONDS); + } + }); + + long term = 1; + actorContext.getTermInformation().update(term, ""); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // The follower would normally reply - simulate that explicitly here. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex, term, (short)0)); + assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); + + followerActor.underlyingActor().clear(); + + sendReplicate(actorContext, lastIndex + 1); + + // Wait slightly longer than heartbeat duration + Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS); + + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); + + List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + assertEquals("The number of append entries collected should be 2", 2, allMessages.size()); + + assertEquals(1, allMessages.get(0).getEntries().size()); + assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex()); + assertEquals(1, allMessages.get(1).getEntries().size()); + assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex()); + + } + + @Test + public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception { + logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setConfigParams(new DefaultConfigParamsImpl() { + @Override + public FiniteDuration getHeartBeatInterval() { + return FiniteDuration.apply(100, TimeUnit.MILLISECONDS); + } + }); + + long term = 1; + actorContext.getTermInformation().update(term, ""); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // The follower would normally reply - simulate that explicitly here. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex, term, (short)0)); + assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); + + followerActor.underlyingActor().clear(); + + for (int i = 0; i < 3; i++) { + Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); + } + + List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + assertEquals("The number of append entries collected should be 3", 3, allMessages.size()); + } + + @Test + public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception { + logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate"); + + MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setConfigParams(new DefaultConfigParamsImpl() { + @Override + public FiniteDuration getHeartBeatInterval() { + return FiniteDuration.apply(100, TimeUnit.MILLISECONDS); + } + }); + + long term = 1; + actorContext.getTermInformation().update(term, ""); + + leader = new Leader(actorContext); + + // Leader will send an immediate heartbeat - ignore it. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + // The follower would normally reply - simulate that explicitly here. + long lastIndex = actorContext.getReplicatedLog().lastIndex(); + leader.handleMessage(followerActor, new AppendEntriesReply( + FOLLOWER_ID, term, true, lastIndex, term, (short)0)); + assertEquals("isFollowerActive", true, leader.getFollower(FOLLOWER_ID).isFollowerActive()); + + followerActor.underlyingActor().clear(); + + Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); + sendReplicate(actorContext, lastIndex + 1); + + List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + assertEquals("The number of append entries collected should be 2", 2, allMessages.size()); + + assertEquals(0, allMessages.get(0).getEntries().size()); + assertEquals(1, allMessages.get(1).getEntries().size()); } + @Test public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception { logStart("testHandleReplicateMessageWhenThereAreNoFollowers"); @@ -172,13 +527,14 @@ public class LeaderTest extends AbstractLeaderTest { long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1; long term = actorContext.getTermInformation().getCurrentTerm(); - MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry( - term, newLogIndex, new MockRaftActorContext.MockPayload("foo")); + ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry( + newLogIndex, term, new MockRaftActorContext.MockPayload("foo")); actorContext.getReplicatedLog().append(newEntry); + final Identifier id = new MockIdentifier("state-id"); RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, - new Replicate(leaderActor, "state-id", newEntry)); + new Replicate(leaderActor, id, newEntry, true)); // State should not change assertTrue(raftBehavior instanceof Leader); @@ -191,7 +547,7 @@ public class LeaderTest extends AbstractLeaderTest { leaderActor, ApplyState.class); assertEquals("ApplyState count", newLogIndex, applyStateList.size()); - for(int i = 0; i <= newLogIndex - 1; i++ ) { + for (int i = 0; i <= newLogIndex - 1; i++) { ApplyState applyState = applyStateList.get(i); assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex()); assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm()); @@ -199,14 +555,14 @@ public class LeaderTest extends AbstractLeaderTest { ApplyState last = applyStateList.get((int) newLogIndex - 1); assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData()); - assertEquals("getIdentifier", "state-id", last.getIdentifier()); + assertEquals("getIdentifier", id, last.getIdentifier()); } @Test public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception { logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot"); - MockRaftActorContext actorContext = createActorContextWithFollower(); + final MockRaftActorContext actorContext = createActorContextWithFollower(); Map leadersSnapshot = new HashMap<>(); leadersSnapshot.put("1", "A"); @@ -216,33 +572,33 @@ public class LeaderTest extends AbstractLeaderTest { //clears leaders log actorContext.getReplicatedLog().removeFrom(0); - final int followersLastIndex = 2; - final int snapshotIndex = 3; - final int newEntryIndex = 4; + final int commitIndex = 3; + final int snapshotIndex = 2; final int snapshotTerm = 1; - final int currentTerm = 2; // set the snapshot variables in replicatedlog actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); - actorContext.setCommitIndex(followersLastIndex); + actorContext.setCommitIndex(commitIndex); //set follower timeout to 2 mins, helps during debugging actorContext.setConfigParams(new MockConfigParamsImpl(120000L, 10)); leader = new Leader(actorContext); - // new entry - ReplicatedLogImplEntry entry = - new ReplicatedLogImplEntry(newEntryIndex, currentTerm, - new MockRaftActorContext.MockPayload("D")); + leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); + leader.getFollower(FOLLOWER_ID).setNextIndex(0); //update follower timestamp leader.markFollowerActive(FOLLOWER_ID); ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(Optional.of(bs)); - FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs); - leader.setFollowerSnapshot(FOLLOWER_ID, fts); + leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()), + Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, + -1, null, null), ByteSource.wrap(bs.toByteArray()))); + LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState( + actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName()); + fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray())); + leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts); //send first chunk and no InstallSnapshotReply received yet fts.getNextChunk(); @@ -251,29 +607,27 @@ public class LeaderTest extends AbstractLeaderTest { Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); - leader.handleMessage(leaderActor, new SendHeartBeat()); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); - AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - - AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto); + AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty()); //InstallSnapshotReply received fts.markSendStatus(true); - leader.handleMessage(leaderActor, new SendHeartBeat()); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); InstallSnapshot is = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); - assertEquals(snapshotIndex, is.getLastIncludedIndex()); + assertEquals(commitIndex, is.getLastIncludedIndex()); } @Test public void testSendAppendEntriesSnapshotScenario() throws Exception { logStart("testSendAppendEntriesSnapshotScenario"); - MockRaftActorContext actorContext = createActorContextWithFollower(); + final MockRaftActorContext actorContext = createActorContextWithFollower(); Map leadersSnapshot = new HashMap<>(); leadersSnapshot.put("1", "A"); @@ -300,20 +654,22 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); // new entry - ReplicatedLogImplEntry entry = - new ReplicatedLogImplEntry(newEntryIndex, currentTerm, + SimpleReplicatedLogEntry entry = + new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D")); + actorContext.getReplicatedLog().append(entry); + //update follower timestamp leader.markFollowerActive(FOLLOWER_ID); // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex RaftActorBehavior raftBehavior = leader.handleMessage( - leaderActor, new Replicate(null, "state-id", entry)); + leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true)); assertTrue(raftBehavior instanceof Leader); - MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class); + assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing()); } @Test @@ -322,11 +678,6 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext actorContext = createActorContextWithFollower(); - Map leadersSnapshot = new HashMap<>(); - leadersSnapshot.put("1", "A"); - leadersSnapshot.put("2", "B"); - leadersSnapshot.put("3", "C"); - //clears leaders log actorContext.getReplicatedLog().removeFrom(0); @@ -348,10 +699,10 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); // set the snapshot as absent and check if capture-snapshot is invoked. - leader.setSnapshot(Optional.absent()); + leader.setSnapshot(null); // new entry - ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm, + SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D")); actorContext.getReplicatedLog().append(entry); @@ -359,64 +710,210 @@ public class LeaderTest extends AbstractLeaderTest { //update follower timestamp leader.markFollowerActive(FOLLOWER_ID); - leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry)); + leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true)); + + assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing()); - CaptureSnapshot cs = MessageCollectorActor.expectFirstMatching(leaderActor, CaptureSnapshot.class); + CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot(); - assertTrue(cs.isInstallSnapshotInitiated()); assertEquals(3, cs.getLastAppliedIndex()); assertEquals(1, cs.getLastAppliedTerm()); assertEquals(4, cs.getLastIndex()); assertEquals(2, cs.getLastTerm()); // if an initiate is started again when first is in progress, it shouldnt initiate Capture - leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry)); + leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true)); - List captureSnapshots = MessageCollectorActor.getAllMatching(leaderActor, CaptureSnapshot.class); - assertEquals("CaptureSnapshot should not get invoked when initiate is in progress", 1, captureSnapshots.size()); + assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); } @Test - public void testInstallSnapshot() throws Exception { - logStart("testInstallSnapshot"); + public void testInitiateForceInstallSnapshot() throws Exception { + logStart("testInitiateForceInstallSnapshot"); MockRaftActorContext actorContext = createActorContextWithFollower(); - Map leadersSnapshot = new HashMap<>(); - leadersSnapshot.put("1", "A"); - leadersSnapshot.put("2", "B"); - leadersSnapshot.put("3", "C"); - - //clears leaders log - actorContext.getReplicatedLog().removeFrom(0); - final int followersLastIndex = 2; - final int snapshotIndex = 3; - final int snapshotTerm = 1; + final int snapshotIndex = -1; + final int newEntryIndex = 4; + final int snapshotTerm = -1; final int currentTerm = 2; // set the snapshot variables in replicatedlog actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); - actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); + actorContext.setLastApplied(3); actorContext.setCommitIndex(followersLastIndex); + actorContext.getReplicatedLog().removeFrom(0); + + AtomicReference> installSnapshotStream = new AtomicReference<>(); + actorContext.setCreateSnapshotProcedure(installSnapshotStream::set); + leader = new Leader(actorContext); + actorContext.setCurrentBehavior(leader); - // Ignore initial heartbeat. + // Leader will send an immediate heartbeat - ignore it. MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, - new SendInstallSnapshot(toByteString(leadersSnapshot))); + // set the snapshot as absent and check if capture-snapshot is invoked. + leader.setSnapshot(null); - assertTrue(raftBehavior instanceof Leader); + for (int i = 0; i < 4; i++) { + actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1, + new MockRaftActorContext.MockPayload("X" + i))); + } + + // new entry + SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, + new MockRaftActorContext.MockPayload("D")); + + actorContext.getReplicatedLog().append(entry); + + //update follower timestamp + leader.markFollowerActive(FOLLOWER_ID); + + // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets + // installed with a SendInstallSnapshot + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true)); + + assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing()); + + CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot(); + assertEquals(3, cs.getLastAppliedIndex()); + assertEquals(1, cs.getLastAppliedTerm()); + assertEquals(4, cs.getLastIndex()); + assertEquals(2, cs.getLastTerm()); + + assertNotNull("Create snapshot procedure not invoked", installSnapshotStream.get()); + assertTrue("Install snapshot stream present", installSnapshotStream.get().isPresent()); + + MessageCollectorActor.clearMessages(followerActor); + + // Sending Replicate message should not initiate another capture since the first is in progress. + leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true)); + assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); + + // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture. + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true)); + assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); + + // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent. + final byte[] bytes = new byte[]{1, 2, 3}; + installSnapshotStream.get().get().write(bytes); + actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(), + Runtime.getRuntime().totalMemory()); + MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); + + // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk. + MessageCollectorActor.clearMessages(followerActor); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true)); + MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200); + } + + + @Test + public void testInstallSnapshot() throws Exception { + logStart("testInstallSnapshot"); + + final MockRaftActorContext actorContext = createActorContextWithFollower(); + + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); + + //clears leaders log + actorContext.getReplicatedLog().removeFrom(0); + + final int lastAppliedIndex = 3; + final int snapshotIndex = 2; + final int snapshotTerm = 1; + final int currentTerm = 2; + + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); + actorContext.setCommitIndex(lastAppliedIndex); + actorContext.setLastApplied(lastAppliedIndex); + + leader = new Leader(actorContext); + + // Initial heartbeat. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); + leader.getFollower(FOLLOWER_ID).setNextIndex(0); + + byte[] bytes = toByteString(leadersSnapshot).toByteArray(); + Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.emptyList(), + lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null); + + RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, + new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes))); + + assertTrue(raftBehavior instanceof Leader); // check if installsnapshot gets called with the correct values. - InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); + InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, + InstallSnapshot.class); assertNotNull(installSnapshot.getData()); - assertEquals(snapshotIndex, installSnapshot.getLastIncludedIndex()); + assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex()); + assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm()); + + assertEquals(currentTerm, installSnapshot.getTerm()); + } + + @Test + public void testForceInstallSnapshot() throws Exception { + logStart("testForceInstallSnapshot"); + + final MockRaftActorContext actorContext = createActorContextWithFollower(); + + Map leadersSnapshot = new HashMap<>(); + leadersSnapshot.put("1", "A"); + leadersSnapshot.put("2", "B"); + leadersSnapshot.put("3", "C"); + + final int lastAppliedIndex = 3; + final int snapshotIndex = -1; + final int snapshotTerm = -1; + final int currentTerm = 2; + + // set the snapshot variables in replicatedlog + actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); + actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm); + actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); + actorContext.setCommitIndex(lastAppliedIndex); + actorContext.setLastApplied(lastAppliedIndex); + + leader = new Leader(actorContext); + + // Initial heartbeat. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); + leader.getFollower(FOLLOWER_ID).setNextIndex(-1); + + byte[] bytes = toByteString(leadersSnapshot).toByteArray(); + Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.emptyList(), + lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null); + + RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, + new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes))); + + assertTrue(raftBehavior instanceof Leader); + + // check if installsnapshot gets called with the correct values. + + InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, + InstallSnapshot.class); + + assertNotNull(installSnapshot.getData()); + assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex()); assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm()); assertEquals(currentTerm, installSnapshot.getTerm()); @@ -428,14 +925,18 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext actorContext = createActorContextWithFollower(); - final int followersLastIndex = 2; - final int snapshotIndex = 3; + final int commitIndex = 3; + final int snapshotIndex = 2; final int snapshotTerm = 1; final int currentTerm = 2; - actorContext.setCommitIndex(followersLastIndex); + actorContext.setCommitIndex(commitIndex); leader = new Leader(actorContext); + actorContext.setCurrentBehavior(leader); + + leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); + leader.getFollower(FOLLOWER_ID).setNextIndex(0); // Ignore initial heartbeat. MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -452,10 +953,14 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(Optional.of(bs)); - FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs); - leader.setFollowerSnapshot(FOLLOWER_ID, fts); - while(!fts.isLastChunk(fts.getChunkIndex())) { + leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()), + Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, + -1, null, null), ByteSource.wrap(bs.toByteArray()))); + LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState( + actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName()); + fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray())); + leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts); + while (!fts.isLastChunk(fts.getChunkIndex())) { fts.getNextChunk(); fts.incrementChunkIndex(); } @@ -468,13 +973,13 @@ public class LeaderTest extends AbstractLeaderTest { assertTrue(raftBehavior instanceof Leader); - assertEquals(0, leader.followerSnapshotSize()); assertEquals(1, leader.followerLogSize()); FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID); assertNotNull(fli); - assertEquals(snapshotIndex, fli.getMatchIndex()); - assertEquals(snapshotIndex, fli.getMatchIndex()); - assertEquals(snapshotIndex + 1, fli.getNextIndex()); + assertNull(fli.getInstallSnapshotState()); + assertEquals(commitIndex, fli.getMatchIndex()); + assertEquals(commitIndex + 1, fli.getNextIndex()); + assertFalse(leader.hasSnapshot()); } @Test @@ -483,12 +988,12 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext actorContext = createActorContextWithFollower(); - final int followersLastIndex = 2; - final int snapshotIndex = 3; + final int commitIndex = 3; + final int snapshotIndex = 2; final int snapshotTerm = 1; final int currentTerm = 2; - DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){ + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() { @Override public int getSnapshotChunkSize() { return 50; @@ -498,9 +1003,13 @@ public class LeaderTest extends AbstractLeaderTest { configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS)); actorContext.setConfigParams(configParams); - actorContext.setCommitIndex(followersLastIndex); + actorContext.setCommitIndex(commitIndex); leader = new Leader(actorContext); + actorContext.setCurrentBehavior(leader); + + leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); + leader.getFollower(FOLLOWER_ID).setNextIndex(0); Map leadersSnapshot = new HashMap<>(); leadersSnapshot.put("1", "A"); @@ -513,11 +1022,14 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(Optional.of(bs)); + Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()), + Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, + -1, null, null); - leader.handleMessage(leaderActor, new SendInstallSnapshot(bs)); + leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray()))); - InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); + InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, + InstallSnapshot.class); assertEquals(1, installSnapshot.getChunkIndex()); assertEquals(3, installSnapshot.getTotalChunks()); @@ -544,32 +1056,35 @@ public class LeaderTest extends AbstractLeaderTest { installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class); - Assert.assertNull(installSnapshot); + assertNull(installSnapshot); } @Test - public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{ + public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception { logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex"); MockRaftActorContext actorContext = createActorContextWithFollower(); - final int followersLastIndex = 2; - final int snapshotIndex = 3; + final int commitIndex = 3; + final int snapshotIndex = 2; final int snapshotTerm = 1; final int currentTerm = 2; - actorContext.setConfigParams(new DefaultConfigParamsImpl(){ + actorContext.setConfigParams(new DefaultConfigParamsImpl() { @Override public int getSnapshotChunkSize() { return 50; } }); - actorContext.setCommitIndex(followersLastIndex); + actorContext.setCommitIndex(commitIndex); leader = new Leader(actorContext); + leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); + leader.getFollower(FOLLOWER_ID).setNextIndex(0); + Map leadersSnapshot = new HashMap<>(); leadersSnapshot.put("1", "A"); leadersSnapshot.put("2", "B"); @@ -581,12 +1096,15 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(Optional.of(bs)); + Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()), + Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, + -1, null, null); Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); - leader.handleMessage(leaderActor, new SendInstallSnapshot(bs)); + leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray()))); - InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); + InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, + InstallSnapshot.class); assertEquals(1, installSnapshot.getChunkIndex()); assertEquals(3, installSnapshot.getTotalChunks()); @@ -599,7 +1117,7 @@ public class LeaderTest extends AbstractLeaderTest { Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); - leader.handleMessage(leaderActor, new SendHeartBeat()); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); @@ -613,8 +1131,8 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext actorContext = createActorContextWithFollower(); - final int followersLastIndex = 2; - final int snapshotIndex = 3; + final int commitIndex = 3; + final int snapshotIndex = 2; final int snapshotTerm = 1; final int currentTerm = 2; @@ -625,10 +1143,13 @@ public class LeaderTest extends AbstractLeaderTest { } }); - actorContext.setCommitIndex(followersLastIndex); + actorContext.setCommitIndex(commitIndex); leader = new Leader(actorContext); + leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); + leader.getFollower(FOLLOWER_ID).setNextIndex(0); + Map leadersSnapshot = new HashMap<>(); leadersSnapshot.put("1", "A"); leadersSnapshot.put("2", "B"); @@ -640,17 +1161,21 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(Optional.of(bs)); + Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()), + Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, + -1, null, null); - leader.handleMessage(leaderActor, new SendInstallSnapshot(bs)); + leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray()))); - InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); + InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, + InstallSnapshot.class); assertEquals(1, installSnapshot.getChunkIndex()); assertEquals(3, installSnapshot.getTotalChunks()); - assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue()); + assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE, + installSnapshot.getLastChunkHashCode().get().intValue()); - int hashCode = installSnapshot.getData().hashCode(); + final int hashCode = Arrays.hashCode(installSnapshot.getData()); followerActor.underlyingActor().clear(); @@ -665,19 +1190,8 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testFollowerToSnapshotLogic() { - logStart("testFollowerToSnapshotLogic"); - - MockRaftActorContext actorContext = createActorContext(); - - actorContext.setConfigParams(new DefaultConfigParamsImpl() { - @Override - public int getSnapshotChunkSize() { - return 50; - } - }); - - leader = new Leader(actorContext); + public void testLeaderInstallSnapshotState() throws IOException { + logStart("testLeaderInstallSnapshotState"); Map leadersSnapshot = new HashMap<>(); leadersSnapshot.put("1", "A"); @@ -687,22 +1201,22 @@ public class LeaderTest extends AbstractLeaderTest { ByteString bs = toByteString(leadersSnapshot); byte[] barray = bs.toByteArray(); - FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs); - leader.setFollowerSnapshot(FOLLOWER_ID, fts); + LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test"); + fts.setSnapshotBytes(ByteSource.wrap(barray)); assertEquals(bs.size(), barray.length); - int chunkIndex=0; - for (int i=0; i < barray.length; i = i + 50) { - int j = i + 50; + int chunkIndex = 0; + for (int i = 0; i < barray.length; i = i + 50) { + int length = i + 50; chunkIndex++; if (i + 50 > barray.length) { - j = barray.length; + length = barray.length; } - ByteString chunk = fts.getNextChunk(); - assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.size()); + byte[] chunk = fts.getNextChunk(); + assertEquals("bytestring size not matching for chunk:" + chunkIndex, length - i, chunk.length); assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex()); fts.markSendStatus(true); @@ -712,10 +1226,11 @@ public class LeaderTest extends AbstractLeaderTest { } assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks()); + fts.close(); } - @Override protected RaftActorBehavior createBehavior( - RaftActorContext actorContext) { + @Override + protected Leader createBehavior(final RaftActorContext actorContext) { return new Leader(actorContext); } @@ -725,8 +1240,18 @@ public class LeaderTest extends AbstractLeaderTest { } @Override - protected MockRaftActorContext createActorContext(ActorRef actorRef) { - return createActorContext("leader", actorRef); + protected MockRaftActorContext createActorContext(final ActorRef actorRef) { + return createActorContext(LEADER_ID, actorRef); + } + + private MockRaftActorContext createActorContext(final String id, final ActorRef actorRef) { + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); + configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS)); + configParams.setElectionTimeoutFactor(100000); + MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef); + context.setConfigParams(configParams); + context.setPayloadVersion(payloadVersion); + return context; } private MockRaftActorContext createActorContextWithFollower() { @@ -736,25 +1261,26 @@ public class LeaderTest extends AbstractLeaderTest { return actorContext; } - private MockRaftActorContext createActorContext(String id, ActorRef actorRef) { - DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); - configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS)); - configParams.setElectionTimeoutFactor(100000); - MockRaftActorContext context = new MockRaftActorContext(id, getSystem(), actorRef); - context.setConfigParams(configParams); - return context; + private MockRaftActorContext createFollowerActorContextWithLeader() { + MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor); + DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl(); + followerConfig.setElectionTimeoutFactor(10000); + followerActorContext.setConfigParams(followerConfig); + followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString())); + return followerActorContext; } @Test public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception { logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex"); - MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + final MockRaftActorContext leaderActorContext = createActorContextWithFollower(); MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor); Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); + followerActorContext.setCurrentBehavior(follower); Map peerAddresses = new HashMap<>(); peerAddresses.put(FOLLOWER_ID, followerActor.path().toString()); @@ -778,10 +1304,11 @@ public class LeaderTest extends AbstractLeaderTest { followerActorContext.setCommitIndex(1); leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - assertEquals(1, appendEntries.getLeaderCommit()); + assertEquals(-1, appendEntries.getLeaderCommit()); assertEquals(0, appendEntries.getEntries().size()); assertEquals(0, appendEntries.getPrevLogIndex()); @@ -802,17 +1329,19 @@ public class LeaderTest extends AbstractLeaderTest { public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception { logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex"); - MockRaftActorContext leaderActorContext = createActorContext(); + final MockRaftActorContext leaderActorContext = createActorContext(); MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor); + followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString())); Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); + followerActorContext.setCurrentBehavior(follower); - Map peerAddresses = new HashMap<>(); - peerAddresses.put(FOLLOWER_ID, followerActor.path().toString()); + Map leaderPeerAddresses = new HashMap<>(); + leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString()); - leaderActorContext.setPeerAddresses(peerAddresses); + leaderActorContext.setPeerAddresses(leaderPeerAddresses); leaderActorContext.getReplicatedLog().removeFrom(0); @@ -834,7 +1363,7 @@ public class LeaderTest extends AbstractLeaderTest { // Initial heartbeat AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - assertEquals(1, appendEntries.getLeaderCommit()); + assertEquals(-1, appendEntries.getLeaderCommit()); assertEquals(0, appendEntries.getEntries().size()); assertEquals(0, appendEntries.getPrevLogIndex()); @@ -853,7 +1382,7 @@ public class LeaderTest extends AbstractLeaderTest { Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); - leader.handleMessage(leaderActor, new SendHeartBeat()); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -872,26 +1401,298 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testHandleAppendEntriesReplyFailure(){ - logStart("testHandleAppendEntriesReplyFailure"); + public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader() { + logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader"); MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(1000, TimeUnit.SECONDS)); + + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); + long leaderCommitIndex = 2; + leaderActorContext.setCommitIndex(leaderCommitIndex); + leaderActorContext.setLastApplied(leaderCommitIndex); + + final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1); + final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2); + + MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader(); + + followerActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build()); + followerActorContext.setCommitIndex(0); + followerActorContext.setLastApplied(0); + + Follower follower = new Follower(followerActorContext); + followerActor.underlyingActor().setBehavior(follower); leader = new Leader(leaderActorContext); - // Send initial heartbeat reply with last index. - leader.handleAppendEntriesReply(followerActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 10, 1)); + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, + AppendEntriesReply.class); + + MessageCollectorActor.clearMessages(followerActor); + MessageCollectorActor.clearMessages(leaderActor); + + // Verify initial AppendEntries sent. + assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit()); + assertEquals("Log entries size", 0, appendEntries.getEntries().size()); + assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex()); + + leaderActor.underlyingActor().setBehavior(leader); + + leader.handleMessage(followerActor, appendEntriesReply); + + MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1); + appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex()); + assertEquals("Log entries size", 2, appendEntries.getEntries().size()); + + assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex()); + assertEquals("First entry data", leadersSecondLogEntry.getData(), + appendEntries.getEntries().get(0).getData()); + assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex()); + assertEquals("Second entry data", leadersThirdLogEntry.getData(), + appendEntries.getEntries().get(1).getData()); FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); - assertEquals("getNextIndex", 11, followerInfo.getNextIndex()); + assertEquals("getNextIndex", 3, followerInfo.getNextIndex()); - AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, false, 10, 1); + List applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2); - RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply); + ApplyState applyState = applyStateList.get(0); + assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex()); + assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm()); + assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(), + applyState.getReplicatedLogEntry().getData()); + + applyState = applyStateList.get(1); + assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex()); + assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm()); + assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(), + applyState.getReplicatedLogEntry().getData()); + + assertEquals("Follower's commit index", 2, followerActorContext.getCommitIndex()); + assertEquals("Follower's lastIndex", 2, followerActorContext.getReplicatedLog().lastIndex()); + } - assertEquals(RaftState.Leader, raftActorBehavior.state()); + @Test + public void testHandleAppendEntriesReplyFailureWithFollowersLogEmpty() { + logStart("testHandleAppendEntriesReplyFailureWithFollowersLogEmpty"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(1000, TimeUnit.SECONDS)); + + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 1).build()); + long leaderCommitIndex = 1; + leaderActorContext.setCommitIndex(leaderCommitIndex); + leaderActorContext.setLastApplied(leaderCommitIndex); + + final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0); + final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1); + + MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader(); + + followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + followerActorContext.setCommitIndex(-1); + followerActorContext.setLastApplied(-1); + + Follower follower = new Follower(followerActorContext); + followerActor.underlyingActor().setBehavior(follower); + followerActorContext.setCurrentBehavior(follower); + + leader = new Leader(leaderActorContext); + + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, + AppendEntriesReply.class); + + MessageCollectorActor.clearMessages(followerActor); + MessageCollectorActor.clearMessages(leaderActor); + + // Verify initial AppendEntries sent with the leader's current commit index. + assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit()); + assertEquals("Log entries size", 0, appendEntries.getEntries().size()); + assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex()); + + leaderActor.underlyingActor().setBehavior(leader); + leaderActorContext.setCurrentBehavior(leader); + + leader.handleMessage(followerActor, appendEntriesReply); + + MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1); + appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + assertEquals("Log entries size", 2, appendEntries.getEntries().size()); + + assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex()); + assertEquals("First entry data", leadersFirstLogEntry.getData(), + appendEntries.getEntries().get(0).getData()); + assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex()); + assertEquals("Second entry data", leadersSecondLogEntry.getData(), + appendEntries.getEntries().get(1).getData()); + + FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); + assertEquals("getNextIndex", 2, followerInfo.getNextIndex()); + + List applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2); + + ApplyState applyState = applyStateList.get(0); + assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex()); + assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm()); + assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(), + applyState.getReplicatedLogEntry().getData()); + + applyState = applyStateList.get(1); + assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex()); + assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm()); + assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(), + applyState.getReplicatedLogEntry().getData()); + + assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex()); + assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex()); + } + + @Test + public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent() { + logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(1000, TimeUnit.SECONDS)); + + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build()); + long leaderCommitIndex = 1; + leaderActorContext.setCommitIndex(leaderCommitIndex); + leaderActorContext.setLastApplied(leaderCommitIndex); + + final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0); + final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1); + + MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader(); + + followerActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 1, 1).build()); + followerActorContext.setCommitIndex(-1); + followerActorContext.setLastApplied(-1); + + Follower follower = new Follower(followerActorContext); + followerActor.underlyingActor().setBehavior(follower); + followerActorContext.setCurrentBehavior(follower); + + leader = new Leader(leaderActorContext); + + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, + AppendEntriesReply.class); + + MessageCollectorActor.clearMessages(followerActor); + MessageCollectorActor.clearMessages(leaderActor); + + // Verify initial AppendEntries sent with the leader's current commit index. + assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit()); + assertEquals("Log entries size", 0, appendEntries.getEntries().size()); + assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex()); + + leaderActor.underlyingActor().setBehavior(leader); + leaderActorContext.setCurrentBehavior(leader); + + leader.handleMessage(followerActor, appendEntriesReply); + + MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 1); + appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + assertEquals("Log entries size", 2, appendEntries.getEntries().size()); + + assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex()); + assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm()); + assertEquals("First entry data", leadersFirstLogEntry.getData(), + appendEntries.getEntries().get(0).getData()); + assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex()); + assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm()); + assertEquals("Second entry data", leadersSecondLogEntry.getData(), + appendEntries.getEntries().get(1).getData()); + + FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); + assertEquals("getNextIndex", 2, followerInfo.getNextIndex()); + + List applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2); + + ApplyState applyState = applyStateList.get(0); + assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex()); + assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm()); + assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(), + applyState.getReplicatedLogEntry().getData()); + + applyState = applyStateList.get(1); + assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex()); + assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm()); + assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(), + applyState.getReplicatedLogEntry().getData()); + + assertEquals("Follower's commit index", 1, followerActorContext.getCommitIndex()); + assertEquals("Follower's lastIndex", 1, followerActorContext.getReplicatedLog().lastIndex()); + assertEquals("Follower's lastTerm", 2, followerActorContext.getReplicatedLog().lastTerm()); + } + + @Test + public void testHandleAppendEntriesReplyWithNewerTerm() { + logStart("testHandleAppendEntriesReplyWithNewerTerm"); + + MockRaftActorContext leaderActorContext = createActorContext(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(10000, TimeUnit.SECONDS)); + + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build()); + + leader = new Leader(leaderActorContext); + leaderActor.underlyingActor().setBehavior(leader); + leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender()); - assertEquals("getNextIndex", 10, followerInfo.getNextIndex()); + AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, + AppendEntriesReply.class); + + assertEquals(false, appendEntriesReply.isSuccess()); + assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state()); + + MessageCollectorActor.clearMessages(leaderActor); + } + + @Test + public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled() { + logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled"); + + MockRaftActorContext leaderActorContext = createActorContext(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(10000, TimeUnit.SECONDS)); + + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 2, 2).build()); + leaderActorContext.setRaftPolicy(createRaftPolicy(false, false)); + + leader = new Leader(leaderActorContext); + leaderActor.underlyingActor().setBehavior(leader); + leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender()); + + AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, + AppendEntriesReply.class); + + assertEquals(false, appendEntriesReply.isSuccess()); + assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state()); + + MessageCollectorActor.clearMessages(leaderActor); } @Test @@ -909,7 +1710,12 @@ public class LeaderTest extends AbstractLeaderTest { leader = new Leader(leaderActorContext); - AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1); + FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); + + assertEquals(payloadVersion, leader.getLeaderPayloadVersion()); + assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion()); + + AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion); RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply); @@ -932,17 +1738,22 @@ public class LeaderTest extends AbstractLeaderTest { ApplyState applyState = applyStateList.get(0); assertEquals(2, applyState.getReplicatedLogEntry().getIndex()); + + assertEquals(2, followerInfo.getMatchIndex()); + assertEquals(3, followerInfo.getNextIndex()); + assertEquals(payloadVersion, followerInfo.getPayloadVersion()); + assertEquals(RaftVersions.CURRENT_VERSION, followerInfo.getRaftVersion()); } @Test - public void testHandleAppendEntriesReplyUnknownFollower(){ + public void testHandleAppendEntriesReplyUnknownFollower() { logStart("testHandleAppendEntriesReplyUnknownFollower"); MockRaftActorContext leaderActorContext = createActorContext(); leader = new Leader(leaderActorContext); - AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1); + AppendEntriesReply reply = new AppendEntriesReply("unkown-follower", 1, false, 10, 1, (short)0); RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply); @@ -950,7 +1761,93 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testHandleRequestVoteReply(){ + public void testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded() { + logStart("testFollowerCatchUpWithAppendEntriesMaxDataSizeExceeded"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(1000, TimeUnit.SECONDS)); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2); + + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build()); + long leaderCommitIndex = 3; + leaderActorContext.setCommitIndex(leaderCommitIndex); + leaderActorContext.setLastApplied(leaderCommitIndex); + + final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0); + final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1); + final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2); + final ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3); + + MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader(); + + followerActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + followerActorContext.setCommitIndex(-1); + followerActorContext.setLastApplied(-1); + + Follower follower = new Follower(followerActorContext); + followerActor.underlyingActor().setBehavior(follower); + followerActorContext.setCurrentBehavior(follower); + + leader = new Leader(leaderActorContext); + + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, + AppendEntriesReply.class); + + MessageCollectorActor.clearMessages(followerActor); + MessageCollectorActor.clearMessages(leaderActor); + + // Verify initial AppendEntries sent with the leader's current commit index. + assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit()); + assertEquals("Log entries size", 0, appendEntries.getEntries().size()); + assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex()); + + leaderActor.underlyingActor().setBehavior(leader); + leaderActorContext.setCurrentBehavior(leader); + + leader.handleMessage(followerActor, appendEntriesReply); + + List appendEntriesList = MessageCollectorActor.expectMatching(followerActor, + AppendEntries.class, 2); + MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2); + + appendEntries = appendEntriesList.get(0); + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + assertEquals("Log entries size", 2, appendEntries.getEntries().size()); + + assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex()); + assertEquals("First entry data", leadersFirstLogEntry.getData(), + appendEntries.getEntries().get(0).getData()); + assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex()); + assertEquals("Second entry data", leadersSecondLogEntry.getData(), + appendEntries.getEntries().get(1).getData()); + + appendEntries = appendEntriesList.get(1); + assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex()); + assertEquals("Log entries size", 2, appendEntries.getEntries().size()); + + assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex()); + assertEquals("First entry data", leadersThirdLogEntry.getData(), + appendEntries.getEntries().get(0).getData()); + assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex()); + assertEquals("Second entry data", leadersFourthLogEntry.getData(), + appendEntries.getEntries().get(1).getData()); + + FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); + assertEquals("getNextIndex", 4, followerInfo.getNextIndex()); + + MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 4); + + assertEquals("Follower's commit index", 3, followerActorContext.getCommitIndex()); + assertEquals("Follower's lastIndex", 3, followerActorContext.getReplicatedLog().lastIndex()); + } + + @Test + public void testHandleRequestVoteReply() { logStart("testHandleRequestVoteReply"); MockRaftActorContext leaderActorContext = createActorContext(); @@ -975,201 +1872,513 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext leaderActorContext = createActorContext(); leader = new Leader(leaderActorContext); - RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); - Assert.assertTrue(behavior instanceof Leader); + RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK); + assertTrue(newBehavior instanceof Leader); } @Test - public void testIsolatedLeaderCheckTwoFollowers() throws Exception { - logStart("testIsolatedLeaderCheckTwoFollowers"); + public void testIsolatedLeaderCheckNoVotingFollowers() { + logStart("testIsolatedLeaderCheckNoVotingFollowers"); - new JavaTestKit(getSystem()) {{ + MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader(); + Follower follower = new Follower(followerActorContext); + followerActor.underlyingActor().setBehavior(follower); - ActorRef followerActor1 = getTestActor(); - ActorRef followerActor2 = getTestActor(); + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(1000, TimeUnit.SECONDS)); + leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING); - MockRaftActorContext leaderActorContext = createActorContext(); + leader = new Leader(leaderActorContext); + leader.getFollower(FOLLOWER_ID).markFollowerActive(); + RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK); + assertTrue("Expected Leader", newBehavior instanceof Leader); + } - Map peerAddresses = new HashMap<>(); - peerAddresses.put("follower-1", followerActor1.path().toString()); - peerAddresses.put("follower-2", followerActor2.path().toString()); + private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(final RaftPolicy raftPolicy) { + ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1"); + ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2"); - leaderActorContext.setPeerAddresses(peerAddresses); + MockRaftActorContext leaderActorContext = createActorContext(); - leader = new Leader(leaderActorContext); + Map peerAddresses = new HashMap<>(); + peerAddresses.put("follower-1", followerActor1.path().toString()); + peerAddresses.put("follower-2", followerActor2.path().toString()); - leader.markFollowerActive("follower-1"); - leader.markFollowerActive("follower-2"); - RaftActorBehavior behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); - Assert.assertTrue("Behavior not instance of Leader when all followers are active", - behavior instanceof Leader); + leaderActorContext.setPeerAddresses(peerAddresses); + leaderActorContext.setRaftPolicy(raftPolicy); - // kill 1 follower and verify if that got killed - final JavaTestKit probe = new JavaTestKit(getSystem()); - probe.watch(followerActor1); - followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender()); - final Terminated termMsg1 = probe.expectMsgClass(Terminated.class); - assertEquals(termMsg1.getActor(), followerActor1); + leader = new Leader(leaderActorContext); - leader.markFollowerInActive("follower-1"); - leader.markFollowerActive("follower-2"); - behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); - Assert.assertTrue("Behavior not instance of Leader when majority of followers are active", - behavior instanceof Leader); + leader.markFollowerActive("follower-1"); + leader.markFollowerActive("follower-2"); + RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK); + assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader); + + // kill 1 follower and verify if that got killed + final JavaTestKit probe = new JavaTestKit(getSystem()); + probe.watch(followerActor1); + followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender()); + final Terminated termMsg1 = probe.expectMsgClass(Terminated.class); + assertEquals(termMsg1.getActor(), followerActor1); + + leader.markFollowerInActive("follower-1"); + leader.markFollowerActive("follower-2"); + newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK); + assertTrue("Behavior not instance of Leader when majority of followers are active", + newBehavior instanceof Leader); + + // kill 2nd follower and leader should change to Isolated leader + followerActor2.tell(PoisonPill.getInstance(), null); + probe.watch(followerActor2); + followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender()); + final Terminated termMsg2 = probe.expectMsgClass(Terminated.class); + assertEquals(termMsg2.getActor(), followerActor2); + + leader.markFollowerInActive("follower-2"); + return leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK); + } + + @Test + public void testIsolatedLeaderCheckTwoFollowers() throws Exception { + logStart("testIsolatedLeaderCheckTwoFollowers"); - // kill 2nd follower and leader should change to Isolated leader - followerActor2.tell(PoisonPill.getInstance(), null); - probe.watch(followerActor2); - followerActor2.tell(PoisonPill.getInstance(), ActorRef.noSender()); - final Terminated termMsg2 = probe.expectMsgClass(Terminated.class); - assertEquals(termMsg2.getActor(), followerActor2); + RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE); - leader.markFollowerInActive("follower-2"); - behavior = leader.handleMessage(leaderActor, new IsolatedLeaderCheck()); - Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive", - behavior instanceof IsolatedLeader); - }}; + assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive", + newBehavior instanceof IsolatedLeader); } + @Test + public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception { + logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled"); + + RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true)); + + assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled", + newBehavior instanceof Leader); + } @Test - public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception { - logStart("testAppendEntryCallAtEndofAppendEntryReply"); + public void testLaggingFollowerStarvation() throws Exception { + logStart("testLaggingFollowerStarvation"); - MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + String leaderActorId = actorFactory.generateActorId("leader"); + String follower1ActorId = actorFactory.generateActorId("follower"); + String follower2ActorId = actorFactory.generateActorId("follower"); + + final ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId); + final ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId); + + MockRaftActorContext leaderActorContext = + new MockRaftActorContext(leaderActorId, getSystem(), leaderActor); DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); - //configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS)); + configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS)); configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS)); leaderActorContext.setConfigParams(configParams); - MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor); + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build()); - followerActorContext.setConfigParams(configParams); + Map peerAddresses = new HashMap<>(); + peerAddresses.put(follower1ActorId, + follower1Actor.path().toString()); + peerAddresses.put(follower2ActorId, + follower2Actor.path().toString()); - Follower follower = new Follower(followerActorContext); - followerActor.underlyingActor().setBehavior(follower); + leaderActorContext.setPeerAddresses(peerAddresses); + leaderActorContext.getTermInformation().update(1, leaderActorId); - leaderActorContext.getReplicatedLog().removeFrom(0); + leader = createBehavior(leaderActorContext); + + leaderActor.underlyingActor().setBehavior(leader); + + for (int i = 1; i < 6; i++) { + // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733) + RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, + new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0)); + assertTrue(newBehavior == leader); + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + } + + // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply + List heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class); + + assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()), + heartbeats.size() > 1); + + // Check if follower-2 got AppendEntries during this time and was not starved + List appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class); + + assertTrue(String.format("%s append entries is less than expected", appendEntries.size()), + appendEntries.size() > 1); + } + + @Test + public void testReplicationConsensusWithNonVotingFollower() { + logStart("testReplicationConsensusWithNonVotingFollower"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(1000, TimeUnit.SECONDS)); + + leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); leaderActorContext.setCommitIndex(-1); leaderActorContext.setLastApplied(-1); - followerActorContext.getReplicatedLog().removeFrom(0); - followerActorContext.setCommitIndex(-1); - followerActorContext.setLastApplied(-1); + String nonVotingFollowerId = "nonvoting-follower"; + ActorRef nonVotingFollowerActor = actorFactory.createActor( + MessageCollectorActor.props(), actorFactory.generateActorId(nonVotingFollowerId)); + + leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), + VotingState.NON_VOTING); leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); - AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching( - leaderActor, AppendEntriesReply.class); + // Ignore initial heartbeats + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class); - leader.handleMessage(followerActor, appendEntriesReply); + MessageCollectorActor.clearMessages(followerActor); + MessageCollectorActor.clearMessages(nonVotingFollowerActor); + MessageCollectorActor.clearMessages(leaderActor); - // Clear initial heartbeat messages + // Send a Replicate message and wait for AppendEntries. + sendReplicate(leaderActorContext, 0); - leaderActor.underlyingActor().clear(); - followerActor.underlyingActor().clear(); + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class); - // create 3 entries - leaderActorContext.setReplicatedLog( - new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); - leaderActorContext.setCommitIndex(1); - leaderActorContext.setLastApplied(1); + // Send reply only from the voting follower and verify consensus via ApplyState. + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0)); - Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(), - TimeUnit.MILLISECONDS); + MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class); - leader.handleMessage(leaderActor, new SendHeartBeat()); + leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 0, 1, (short)0)); - AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + MessageCollectorActor.clearMessages(followerActor); + MessageCollectorActor.clearMessages(nonVotingFollowerActor); + MessageCollectorActor.clearMessages(leaderActor); - // Should send first log entry - assertEquals(1, appendEntries.getLeaderCommit()); - assertEquals(0, appendEntries.getEntries().get(0).getIndex()); - assertEquals(-1, appendEntries.getPrevLogIndex()); + // Send another Replicate message + sendReplicate(leaderActorContext, 1); - appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, + AppendEntries.class); + assertEquals("Log entries size", 1, appendEntries.getEntries().size()); + assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex()); - assertEquals(1, appendEntriesReply.getLogLastTerm()); - assertEquals(0, appendEntriesReply.getLogLastIndex()); + // Send reply only from the non-voting follower and verify no consensus via no ApplyState. + leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0)); - followerActor.underlyingActor().clear(); + MessageCollectorActor.assertNoneMatching(leaderActor, ApplyState.class, 500); - leader.handleAppendEntriesReply(followerActor, appendEntriesReply); + // Send reply from the voting follower and verify consensus. + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0)); - appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class); + } - // Should send second log entry - assertEquals(1, appendEntries.getLeaderCommit()); - assertEquals(1, appendEntries.getEntries().get(0).getIndex()); + @Test + public void testTransferLeadershipWithFollowerInSync() { + logStart("testTransferLeadershipWithFollowerInSync"); - follower.close(); + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + leaderActorContext.setLastApplied(-1); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(1000, TimeUnit.SECONDS)); + leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + + leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); + + // Initial heartbeat + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0)); + MessageCollectorActor.clearMessages(followerActor); + + sendReplicate(leaderActorContext, 0); + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0)); + MessageCollectorActor.expectFirstMatching(leaderActor, ApplyState.class); + MessageCollectorActor.clearMessages(followerActor); + + RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); + leader.transferLeadership(mockTransferCohort); + + verify(mockTransferCohort, never()).transferComplete(); + doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId(); + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0)); + + // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date + MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2); + + // Leader should force an election timeout + MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class); + + verify(mockTransferCohort).transferComplete(); } @Test - public void testLaggingFollowerStarvation() throws Exception { - logStart("testLaggingFollowerStarvation"); - new JavaTestKit(getSystem()) {{ - String leaderActorId = actorFactory.generateActorId("leader"); - String follower1ActorId = actorFactory.generateActorId("follower"); - String follower2ActorId = actorFactory.generateActorId("follower"); + public void testTransferLeadershipWithEmptyLog() { + logStart("testTransferLeadershipWithEmptyLog"); - TestActorRef leaderActor = - actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId); - ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId); - ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId); + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(1000, TimeUnit.SECONDS)); + leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); - MockRaftActorContext leaderActorContext = - new MockRaftActorContext(leaderActorId, getSystem(), leaderActor); + leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); - DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); - configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS)); - configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS)); + // Initial heartbeat + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0)); + MessageCollectorActor.clearMessages(followerActor); - leaderActorContext.setConfigParams(configParams); + RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); + doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId(); + leader.transferLeadership(mockTransferCohort); - leaderActorContext.setReplicatedLog( - new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build()); + verify(mockTransferCohort, never()).transferComplete(); + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0)); - Map peerAddresses = new HashMap<>(); - peerAddresses.put(follower1ActorId, - follower1Actor.path().toString()); - peerAddresses.put(follower2ActorId, - follower2Actor.path().toString()); + // Expect a final AppendEntries to ensure the follower's lastApplied index is up-to-date + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - leaderActorContext.setPeerAddresses(peerAddresses); - leaderActorContext.getTermInformation().update(1, leaderActorId); + // Leader should force an election timeout + MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class); - RaftActorBehavior leader = createBehavior(leaderActorContext); + verify(mockTransferCohort).transferComplete(); + } - leaderActor.underlyingActor().setBehavior(leader); + @Test + public void testTransferLeadershipWithFollowerInitiallyOutOfSync() { + logStart("testTransferLeadershipWithFollowerInitiallyOutOfSync"); - for(int i=1;i<6;i++) { - // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733) - RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1)); - assertTrue(newBehavior == leader); - Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); - } + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(200, TimeUnit.MILLISECONDS)); + + leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); + + // Initial heartbeat + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + MessageCollectorActor.clearMessages(followerActor); + + RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); + doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId(); + leader.transferLeadership(mockTransferCohort); + + verify(mockTransferCohort, never()).transferComplete(); + + // Sync up the follower. + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0)); + MessageCollectorActor.clearMessages(followerActor); + + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams() + .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0)); + + // Leader should force an election timeout + MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class); + + verify(mockTransferCohort).transferComplete(); + } + + @Test + public void testTransferLeadershipWithFollowerSyncTimeout() { + logStart("testTransferLeadershipWithFollowerSyncTimeout"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(200, TimeUnit.MILLISECONDS)); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(2); + leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + + leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); + + // Initial heartbeat + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0)); + MessageCollectorActor.clearMessages(followerActor); + + sendReplicate(leaderActorContext, 0); + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + + MessageCollectorActor.clearMessages(followerActor); + + RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); + leader.transferLeadership(mockTransferCohort); + + verify(mockTransferCohort, never()).transferComplete(); + + // Send heartbeats to time out the transfer. + for (int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) { + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams() + .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS); + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); + } + + verify(mockTransferCohort).abortTransfer(); + verify(mockTransferCohort, never()).transferComplete(); + MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100); + } + + @Test + public void testReplicationWithPayloadSizeThatExceedsThreshold() { + logStart("testReplicationWithPayloadSizeThatExceedsThreshold"); + + final int serializedSize = SerializationUtils.serialize(new AppendEntries(1, LEADER_ID, -1, -1, + Arrays.asList(new SimpleReplicatedLogEntry(0, 1, + new MockRaftActorContext.MockPayload("large"))), 0, -1, (short)0)).length; + final MockRaftActorContext.MockPayload largePayload = + new MockRaftActorContext.MockPayload("large", serializedSize); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(300, TimeUnit.MILLISECONDS)); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(serializedSize - 50); + leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + leaderActorContext.setCommitIndex(-1); + leaderActorContext.setLastApplied(-1); + + leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); + + // Send initial heartbeat reply so follower is marked active + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0)); + MessageCollectorActor.clearMessages(followerActor); + + // Send normal payload first to prime commit index. + final long term = leaderActorContext.getTermInformation().getCurrentTerm(); + sendReplicate(leaderActorContext, term, 0); + + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("Entries size", 1, appendEntries.getEntries().size()); + assertEquals("Entry getIndex", 0, appendEntries.getEntries().get(0).getIndex()); + + leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 0, term, (short)0)); + assertEquals("getCommitIndex", 0, leaderActorContext.getCommitIndex()); + MessageCollectorActor.clearMessages(followerActor); + + // Now send a large payload that exceeds the maximum size for a single AppendEntries - it should be sliced. + sendReplicate(leaderActorContext, term, 1, largePayload); + + MessageSlice messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class); + assertEquals("getSliceIndex", 1, messageSlice.getSliceIndex()); + assertEquals("getTotalSlices", 2, messageSlice.getTotalSlices()); + + final Identifier slicingId = messageSlice.getIdentifier(); + + appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex()); + assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm()); + assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit()); + assertEquals("Entries size", 0, appendEntries.getEntries().size()); + MessageCollectorActor.clearMessages(followerActor); + + // Initiate a heartbeat - it should send an empty AppendEntries since slicing is in progress. + + // Sleep for the heartbeat interval so AppendEntries is sent. + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams() + .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); + + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); + + appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit()); + assertEquals("Entries size", 0, appendEntries.getEntries().size()); + MessageCollectorActor.clearMessages(followerActor); + + // Simulate the MessageSliceReply's and AppendEntriesReply from the follower. + + leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 1, followerActor)); + messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class); + assertEquals("getSliceIndex", 2, messageSlice.getSliceIndex()); + + leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 2, followerActor)); + + leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 1, term, (short)0)); + + MessageCollectorActor.clearMessages(followerActor); + + // Send another normal payload. + + sendReplicate(leaderActorContext, term, 2); + + appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("Entries size", 1, appendEntries.getEntries().size()); + assertEquals("Entry getIndex", 2, appendEntries.getEntries().get(0).getIndex()); + assertEquals("getLeaderCommit", 1, appendEntries.getLeaderCommit()); + } + + @Test + public void testLargePayloadSlicingExpiration() { + logStart("testLargePayloadSlicingExpiration"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(100, TimeUnit.MILLISECONDS)); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(10); + leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + leaderActorContext.setCommitIndex(-1); + leaderActorContext.setLastApplied(-1); + + final long term = leaderActorContext.getTermInformation().getCurrentTerm(); + leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); + + // Send initial heartbeat reply so follower is marked active + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0)); + MessageCollectorActor.clearMessages(followerActor); + + sendReplicate(leaderActorContext, term, 0, new MockRaftActorContext.MockPayload("large", + leaderActorContext.getConfigParams().getSnapshotChunkSize() + 1)); + MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class); + + // Sleep for at least 3 * election timeout so the slicing state expires. + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams() + .getElectionTimeOutInterval().toMillis() * 3 + 50, TimeUnit.MILLISECONDS); + MessageCollectorActor.clearMessages(followerActor); + + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); + + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit()); + assertEquals("Entries size", 0, appendEntries.getEntries().size()); - // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply - List heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class); + MessageCollectorActor.assertNoneMatching(followerActor, MessageSlice.class, 300); + MessageCollectorActor.clearMessages(followerActor); - assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()), - heartbeats.size() > 1); + // Send an AppendEntriesReply - this should restart the slicing. - // Check if follower-2 got AppendEntries during this time and was not starved - List appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class); + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams() + .getHeartBeatInterval().toMillis() + 50, TimeUnit.MILLISECONDS); - assertTrue(String.format("%s append entries is less than expected", appendEntries.size()), - appendEntries.size() > 1); + leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, -1, term, (short)0)); - }}; + MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class); } @Override - protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext, - ActorRef actorRef, RaftRPC rpc) throws Exception { + protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext, + final ActorRef actorRef, final RaftRPC rpc) { super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc); assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor()); } @@ -1179,8 +2388,7 @@ public class LeaderTest extends AbstractLeaderTest { private final long electionTimeOutIntervalMillis; private final int snapshotChunkSize; - public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) { - super(); + MockConfigParamsImpl(final long electionTimeOutIntervalMillis, final int snapshotChunkSize) { this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis; this.snapshotChunkSize = snapshotChunkSize; }