X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FFollowerTest.java;h=1b15ecb135a89f87c212ecf3a080cf8181ba921e;hp=83b9ad3ec7b7a5c7033f876c45d4db2788f50716;hb=ecccb6d5b43dd73aef0d2d19349d19ee9b4728f7;hpb=e316a0ef36279a72767703d190f38a39d7d49395 diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index 83b9ad3ec7..1b15ecb135 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -1,160 +1,430 @@ +/* + * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + package org.opendaylight.controller.cluster.raft.behaviors; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import akka.actor.ActorRef; import akka.actor.Props; -import akka.testkit.JavaTestKit; +import akka.testkit.TestActorRef; +import com.google.common.base.Stopwatch; import com.google.protobuf.ByteString; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.Assert; import org.junit.Test; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; +import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; +import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; -import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; +import scala.concurrent.duration.FiniteDuration; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectOutputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +public class FollowerTest extends AbstractRaftActorBehaviorTest { -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; + private final TestActorRef followerActor = actorFactory.createTestActor( + Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower")); -public class FollowerTest extends AbstractRaftActorBehaviorTest { + private final TestActorRef leaderActor = actorFactory.createTestActor( + Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader")); + + private RaftActorBehavior follower; - private final ActorRef followerActor = getSystem().actorOf(Props.create( - DoNothingActor.class)); + private final short payloadVersion = 5; + @Override + @After + public void tearDown() throws Exception { + if(follower != null) { + follower.close(); + } + + super.tearDown(); + } - @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) { - return new Follower(actorContext); + @Override + protected RaftActorBehavior createBehavior(RaftActorContext actorContext) { + return new TestFollower(actorContext); } - @Override protected RaftActorContext createActorContext() { + @Override + protected MockRaftActorContext createActorContext() { return createActorContext(followerActor); } - protected RaftActorContext createActorContext(ActorRef actorRef){ - return new MockRaftActorContext("test", getSystem(), actorRef); + @Override + protected MockRaftActorContext createActorContext(ActorRef actorRef){ + MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef); + context.setPayloadVersion(payloadVersion ); + return context; + } + + private static int getElectionTimeoutCount(RaftActorBehavior follower){ + if(follower instanceof TestFollower){ + return ((TestFollower) follower).getElectionTimeoutCount(); + } + return -1; } @Test public void testThatAnElectionTimeoutIsTriggered(){ - new JavaTestKit(getSystem()) {{ - - new Within(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6)) { - protected void run() { - - Follower follower = new Follower(createActorContext(getTestActor())); - - final Boolean out = new ExpectMsg(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6), "ElectionTimeout") { - // do not put code outside this method, will run afterwards - protected Boolean match(Object in) { - if (in instanceof ElectionTimeout) { - return true; - } else { - throw noMatch(); - } - } - }.get(); + MockRaftActorContext actorContext = createActorContext(); + follower = new Follower(actorContext); - assertEquals(true, out); - } - }; - }}; + MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class, + actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis()); } @Test public void testHandleElectionTimeout(){ - RaftActorContext raftActorContext = createActorContext(); - Follower follower = - new Follower(raftActorContext); + logStart("testHandleElectionTimeout"); - RaftActorBehavior raftBehavior = - follower.handleMessage(followerActor, new ElectionTimeout()); + follower = new Follower(createActorContext()); + + RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, new ElectionTimeout()); assertTrue(raftBehavior instanceof Candidate); } @Test public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){ - new JavaTestKit(getSystem()) {{ + logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull"); + + RaftActorContext context = createActorContext(); + long term = 1000; + context.getTermInformation().update(term, null); + + follower = createBehavior(context); + + follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999)); + + RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class); + + assertEquals("isVoteGranted", true, reply.isVoteGranted()); + assertEquals("getTerm", term, reply.getTerm()); + assertEquals("schedule election", 1, getElectionTimeoutCount(follower)); + } + + @Test + public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){ + logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId"); + + RaftActorContext context = createActorContext(); + long term = 1000; + context.getTermInformation().update(term, "test"); + + follower = createBehavior(context); + + follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999)); + + RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class); + + assertEquals("isVoteGranted", false, reply.isVoteGranted()); + assertEquals("schedule election", 0, getElectionTimeoutCount(follower)); + } + + + @Test + public void testHandleFirstAppendEntries() throws Exception { + logStart("testHandleFirstAppendEntries"); - new Within(duration("1 seconds")) { - protected void run() { + MockRaftActorContext context = createActorContext(); + context.getReplicatedLog().clear(0,2); + context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar")); + context.getReplicatedLog().setSnapshotIndex(99); - RaftActorContext context = createActorContext(getTestActor()); + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); - context.getTermInformation().update(1000, null); + Assert.assertEquals(1, context.getReplicatedLog().size()); - RaftActorBehavior follower = createBehavior(context); + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0); - follower.handleMessage(getTestActor(), new RequestVote(1000, "test", 10000, 999)); + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); - final Boolean out = new ExpectMsg(duration("1 seconds"), "RequestVoteReply") { - // do not put code outside this method, will run afterwards - protected Boolean match(Object in) { - if (in instanceof RequestVoteReply) { - RequestVoteReply reply = (RequestVoteReply) in; - return reply.isVoteGranted(); - } else { - throw noMatch(); - } - } - }.get(); + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); - assertEquals(true, out); - } - }; - }}; + assertFalse(syncStatus.isInitialSyncDone()); + assertTrue("append entries reply should be true", reply.isSuccess()); } @Test - public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){ - new JavaTestKit(getSystem()) {{ + public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception { + logStart("testHandleFirstAppendEntries"); + + MockRaftActorContext context = createActorContext(); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + + assertFalse(syncStatus.isInitialSyncDone()); + assertFalse("append entries reply should be false", reply.isSuccess()); + } - new Within(duration("1 seconds")) { - protected void run() { + @Test + public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() throws Exception { + logStart("testHandleFirstAppendEntries"); + + MockRaftActorContext context = createActorContext(); + context.getReplicatedLog().clear(0,2); + context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar")); + context.getReplicatedLog().setSnapshotIndex(99); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + + assertFalse(syncStatus.isInitialSyncDone()); + assertTrue("append entries reply should be true", reply.isSuccess()); + } + + @Test + public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() throws Exception { + logStart("testHandleFirstAppendEntries"); - RaftActorContext context = createActorContext(getTestActor()); + MockRaftActorContext context = createActorContext(); + context.getReplicatedLog().clear(0,2); + context.getReplicatedLog().setSnapshotIndex(100); - context.getTermInformation().update(1000, "test"); + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); - RaftActorBehavior follower = createBehavior(context); + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0); - follower.handleMessage(getTestActor(), new RequestVote(1000, "candidate", 10000, 999)); + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); - final Boolean out = new ExpectMsg(duration("1 seconds"), "RequestVoteReply") { - // do not put code outside this method, will run afterwards - protected Boolean match(Object in) { - if (in instanceof RequestVoteReply) { - RequestVoteReply reply = (RequestVoteReply) in; - return reply.isVoteGranted(); - } else { - throw noMatch(); - } - } - }.get(); + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); - assertEquals(false, out); - } - }; - }}; + assertFalse(syncStatus.isInitialSyncDone()); + assertTrue("append entries reply should be true", reply.isSuccess()); } + @Test + public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshotButCalculatedPreviousEntryMissing() throws Exception { + logStart("testHandleFirstAppendEntries"); + + MockRaftActorContext context = createActorContext(); + context.getReplicatedLog().clear(0,2); + context.getReplicatedLog().setSnapshotIndex(100); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 105, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + + assertFalse(syncStatus.isInitialSyncDone()); + assertFalse("append entries reply should be false", reply.isSuccess()); + } + + @Test + public void testHandleSyncUpAppendEntries() throws Exception { + logStart("testHandleSyncUpAppendEntries"); + + MockRaftActorContext context = createActorContext(); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertFalse(syncStatus.isInitialSyncDone()); + + // Clear all the messages + followerActor.underlyingActor().clear(); + + context.setLastApplied(101); + context.setCommitIndex(101); + setLastLogEntry(context, 1, 101, + new MockRaftActorContext.MockPayload("")); + + entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0); + follower.handleMessage(leaderActor, appendEntries); + + syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertTrue(syncStatus.isInitialSyncDone()); + + followerActor.underlyingActor().clear(); + + // Sending the same message again should not generate another message + + follower.handleMessage(leaderActor, appendEntries); + + syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertNull(syncStatus); + + } + + @Test + public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception { + logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete"); + + MockRaftActorContext context = createActorContext(); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertFalse(syncStatus.isInitialSyncDone()); + + // Clear all the messages + followerActor.underlyingActor().clear(); + + context.setLastApplied(100); + setLastLogEntry(context, 1, 100, + new MockRaftActorContext.MockPayload("")); + + entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // leader-2 is becoming the leader now and it says the commitIndex is 45 + appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0); + follower.handleMessage(leaderActor, appendEntries); + + syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + // We get a new message saying initial status is not done + assertFalse(syncStatus.isInitialSyncDone()); + + } + + + @Test + public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception { + logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete"); + + MockRaftActorContext context = createActorContext(); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertFalse(syncStatus.isInitialSyncDone()); + + // Clear all the messages + followerActor.underlyingActor().clear(); + + context.setLastApplied(101); + context.setCommitIndex(101); + setLastLogEntry(context, 1, 101, + new MockRaftActorContext.MockPayload("")); + + entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0); + follower.handleMessage(leaderActor, appendEntries); + + syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertTrue(syncStatus.isInitialSyncDone()); + + // Clear all the messages + followerActor.underlyingActor().clear(); + + context.setLastApplied(100); + setLastLogEntry(context, 1, 100, + new MockRaftActorContext.MockPayload("")); + + entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // leader-2 is becoming the leader now and it says the commitIndex is 45 + appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0); + follower.handleMessage(leaderActor, appendEntries); + + syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + // We get a new message saying initial status is not done + assertFalse(syncStatus.isInitialSyncDone()); + + } + + /** * This test verifies that when an AppendEntries RPC is received by a RaftActor * with a commitIndex that is greater than what has been applied to the @@ -165,32 +435,25 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { */ @Test public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception { - new JavaTestKit(getSystem()) {{ + logStart("testHandleAppendEntriesWithNewerCommitIndex"); - RaftActorContext context = - createActorContext(); + MockRaftActorContext context = createActorContext(); - context.setLastApplied(100); - setLastLogEntry((MockRaftActorContext) context, 1, 100, + context.setLastApplied(100); + setLastLogEntry(context, 1, 100, new MockRaftActorContext.MockPayload("")); - ((MockRaftActorContext) context).getReplicatedLog().setSnapshotIndex(99); - - List entries = - Arrays.asList( - (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(2, 101, - new MockRaftActorContext.MockPayload("foo")) - ); + context.getReplicatedLog().setSnapshotIndex(99); - // The new commitIndex is 101 - AppendEntries appendEntries = - new AppendEntries(2, "leader-1", 100, 1, entries, 101); + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); - RaftActorBehavior raftBehavior = - createBehavior(context).handleMessage(getRef(), appendEntries); + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0); - assertEquals(101L, context.getLastApplied()); + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); - }}; + assertEquals("getLastApplied", 101L, context.getLastApplied()); } /** @@ -201,58 +464,30 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { * @throws Exception */ @Test - public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() - throws Exception { - new JavaTestKit(getSystem()) {{ - - MockRaftActorContext context = (MockRaftActorContext) - createActorContext(); - - // First set the receivers term to lower number - context.getTermInformation().update(95, "test"); - - // Set the last log entry term for the receiver to be greater than - // what we will be sending as the prevLogTerm in AppendEntries - MockRaftActorContext.SimpleReplicatedLog mockReplicatedLog = - setLastLogEntry(context, 20, 0, new MockRaftActorContext.MockPayload("")); - - // AppendEntries is now sent with a bigger term - // this will set the receivers term to be the same as the sender's term - AppendEntries appendEntries = - new AppendEntries(100, "leader-1", 0, 0, null, 101); + public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() { + logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm"); - RaftActorBehavior behavior = createBehavior(context); + MockRaftActorContext context = createActorContext(); - // Send an unknown message so that the state of the RaftActor remains unchanged - RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown"); + // First set the receivers term to lower number + context.getTermInformation().update(95, "test"); - RaftActorBehavior raftBehavior = - behavior.handleMessage(getRef(), appendEntries); + // AppendEntries is now sent with a bigger term + // this will set the receivers term to be the same as the sender's term + AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0); - assertEquals(expected, raftBehavior); + follower = createBehavior(context); - // Also expect an AppendEntriesReply to be sent where success is false - final Boolean out = new ExpectMsg(duration("1 seconds"), - "AppendEntriesReply") { - // do not put code outside this method, will run afterwards - protected Boolean match(Object in) { - if (in instanceof AppendEntriesReply) { - AppendEntriesReply reply = (AppendEntriesReply) in; - return reply.isSuccess(); - } else { - throw noMatch(); - } - } - }.get(); + RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries); - assertEquals(false, out); + Assert.assertSame(follower, newBehavior); + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, + AppendEntriesReply.class); - }}; + assertEquals("isSuccess", false, reply.isSuccess()); } - - /** * This test verifies that when a new AppendEntries message is received with * new entries and the logs of the sender and receiver match that the new @@ -262,165 +497,244 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { * @throws Exception */ @Test - public void testHandleAppendEntriesAddNewEntries() throws Exception { - new JavaTestKit(getSystem()) {{ - - MockRaftActorContext context = (MockRaftActorContext) - createActorContext(); + public void testHandleAppendEntriesAddNewEntries() { + logStart("testHandleAppendEntriesAddNewEntries"); - // First set the receivers term to lower number - context.getTermInformation().update(1, "test"); + MockRaftActorContext context = createActorContext(); - // Prepare the receivers log - MockRaftActorContext.SimpleReplicatedLog log = - new MockRaftActorContext.SimpleReplicatedLog(); - log.append( - new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero"))); - log.append( - new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one"))); - log.append( - new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two"))); + // First set the receivers term to lower number + context.getTermInformation().update(1, "test"); - context.setReplicatedLog(log); + // Prepare the receivers log + MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog(); + log.append(newReplicatedLogEntry(1, 0, "zero")); + log.append(newReplicatedLogEntry(1, 1, "one")); + log.append(newReplicatedLogEntry(1, 2, "two")); - // Prepare the entries to be sent with AppendEntries - List entries = new ArrayList<>(); - entries.add( - new MockRaftActorContext.MockReplicatedLogEntry(1, 3, new MockRaftActorContext.MockPayload("three"))); - entries.add( - new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("four"))); + context.setReplicatedLog(log); - // Send appendEntries with the same term as was set on the receiver - // before the new behavior was created (1 in this case) - // This will not work for a Candidate because as soon as a Candidate - // is created it increments the term - AppendEntries appendEntries = - new AppendEntries(1, "leader-1", 2, 1, entries, 4); + // Prepare the entries to be sent with AppendEntries + List entries = new ArrayList<>(); + entries.add(newReplicatedLogEntry(1, 3, "three")); + entries.add(newReplicatedLogEntry(1, 4, "four")); - RaftActorBehavior behavior = createBehavior(context); + // Send appendEntries with the same term as was set on the receiver + // before the new behavior was created (1 in this case) + // This will not work for a Candidate because as soon as a Candidate + // is created it increments the term + short leaderPayloadVersion = 10; + String leaderId = "leader-1"; + AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion); - // Send an unknown message so that the state of the RaftActor remains unchanged - RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown"); + follower = createBehavior(context); - RaftActorBehavior raftBehavior = - behavior.handleMessage(getRef(), appendEntries); + RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries); - assertEquals(expected, raftBehavior); - assertEquals(5, log.last().getIndex() + 1); - assertNotNull(log.get(3)); - assertNotNull(log.get(4)); + Assert.assertSame(follower, newBehavior); - // Also expect an AppendEntriesReply to be sent where success is false - final Boolean out = new ExpectMsg(duration("1 seconds"), - "AppendEntriesReply") { - // do not put code outside this method, will run afterwards - protected Boolean match(Object in) { - if (in instanceof AppendEntriesReply) { - AppendEntriesReply reply = (AppendEntriesReply) in; - return reply.isSuccess(); - } else { - throw noMatch(); - } - } - }.get(); + assertEquals("Next index", 5, log.last().getIndex() + 1); + assertEquals("Entry 3", entries.get(0), log.get(3)); + assertEquals("Entry 4", entries.get(1), log.get(4)); - assertEquals(true, out); + assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion()); + assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId()); - - }}; + expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4); } - - /** * This test verifies that when a new AppendEntries message is received with * new entries and the logs of the sender and receiver are out-of-sync that * the log is first corrected by removing the out of sync entries from the * log and then adding in the new entries sent with the AppendEntries message - * - * @throws Exception */ @Test - public void testHandleAppendEntriesCorrectReceiverLogEntries() - throws Exception { - new JavaTestKit(getSystem()) {{ + public void testHandleAppendEntriesCorrectReceiverLogEntries() { + logStart("testHandleAppendEntriesCorrectReceiverLogEntries"); + + MockRaftActorContext context = createActorContext(); + + // First set the receivers term to lower number + context.getTermInformation().update(1, "test"); + + // Prepare the receivers log + MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog(); + log.append(newReplicatedLogEntry(1, 0, "zero")); + log.append(newReplicatedLogEntry(1, 1, "one")); + log.append(newReplicatedLogEntry(1, 2, "two")); + + context.setReplicatedLog(log); + + // Prepare the entries to be sent with AppendEntries + List entries = new ArrayList<>(); + entries.add(newReplicatedLogEntry(2, 2, "two-1")); + entries.add(newReplicatedLogEntry(2, 3, "three")); + + // Send appendEntries with the same term as was set on the receiver + // before the new behavior was created (1 in this case) + // This will not work for a Candidate because as soon as a Candidate + // is created it increments the term + AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0); + + follower = createBehavior(context); + + RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries); + + Assert.assertSame(follower, newBehavior); + + // The entry at index 2 will be found out-of-sync with the leader + // and will be removed + // Then the two new entries will be added to the log + // Thus making the log to have 4 entries + assertEquals("Next index", 4, log.last().getIndex() + 1); + //assertEquals("Entry 2", entries.get(0), log.get(2)); + + assertEquals("Entry 1 data", "one", log.get(1).getData().toString()); + + // Check that the entry at index 2 has the new data + assertEquals("Entry 2", entries.get(0), log.get(2)); + + assertEquals("Entry 3", entries.get(1), log.get(3)); + + expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3); + } + + @Test + public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() { + logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot"); + + MockRaftActorContext context = createActorContext(); + + // First set the receivers term to lower number + context.getTermInformation().update(1, "test"); + + // Prepare the receivers log + MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog(); + log.append(newReplicatedLogEntry(1, 0, "zero")); + log.append(newReplicatedLogEntry(1, 1, "one")); + log.append(newReplicatedLogEntry(1, 2, "two")); + + context.setReplicatedLog(log); + + // Prepare the entries to be sent with AppendEntries + List entries = new ArrayList<>(); + entries.add(newReplicatedLogEntry(2, 2, "two-1")); + entries.add(newReplicatedLogEntry(2, 3, "three")); + + // Send appendEntries with the same term as was set on the receiver + // before the new behavior was created (1 in this case) + // This will not work for a Candidate because as soon as a Candidate + // is created it increments the term + AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0); + + context.setRaftPolicy(createRaftPolicy(false, true)); + follower = createBehavior(context); + + RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries); + + Assert.assertSame(follower, newBehavior); + + expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true); + } + + @Test + public void testHandleAppendEntriesPreviousLogEntryMissing(){ + logStart("testHandleAppendEntriesPreviousLogEntryMissing"); + + MockRaftActorContext context = createActorContext(); + + // Prepare the receivers log + MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog(); + log.append(newReplicatedLogEntry(1, 0, "zero")); + log.append(newReplicatedLogEntry(1, 1, "one")); + log.append(newReplicatedLogEntry(1, 2, "two")); + + context.setReplicatedLog(log); + + // Prepare the entries to be sent with AppendEntries + List entries = new ArrayList<>(); + entries.add(newReplicatedLogEntry(1, 4, "four")); + + AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0); + + follower = createBehavior(context); + + RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries); + + Assert.assertSame(follower, newBehavior); + + expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2); + } + + @Test + public void testHandleAppendEntriesWithExistingLogEntry() { + logStart("testHandleAppendEntriesWithExistingLogEntry"); + + MockRaftActorContext context = createActorContext(); + + context.getTermInformation().update(1, "test"); + + // Prepare the receivers log + MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog(); + log.append(newReplicatedLogEntry(1, 0, "zero")); + log.append(newReplicatedLogEntry(1, 1, "one")); + + context.setReplicatedLog(log); - MockRaftActorContext context = (MockRaftActorContext) - createActorContext(); + // Send the last entry again. + List entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one")); - // First set the receivers term to lower number - context.getTermInformation().update(2, "test"); + follower = createBehavior(context); - // Prepare the receivers log - MockRaftActorContext.SimpleReplicatedLog log = - new MockRaftActorContext.SimpleReplicatedLog(); - log.append( - new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero"))); - log.append( - new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one"))); - log.append( - new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two"))); + follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0)); - context.setReplicatedLog(log); + assertEquals("Next index", 2, log.last().getIndex() + 1); + assertEquals("Entry 1", entries.get(0), log.get(1)); - // Prepare the entries to be sent with AppendEntries - List entries = new ArrayList<>(); - entries.add( - new MockRaftActorContext.MockReplicatedLogEntry(2, 2, new MockRaftActorContext.MockPayload("two-1"))); - entries.add( - new MockRaftActorContext.MockReplicatedLogEntry(2, 3, new MockRaftActorContext.MockPayload("three"))); + expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1); - // Send appendEntries with the same term as was set on the receiver - // before the new behavior was created (1 in this case) - // This will not work for a Candidate because as soon as a Candidate - // is created it increments the term - AppendEntries appendEntries = - new AppendEntries(2, "leader-1", 1, 1, entries, 3); + // Send the last entry again and also a new one. - RaftActorBehavior behavior = createBehavior(context); + entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two")); - // Send an unknown message so that the state of the RaftActor remains unchanged - RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown"); + leaderActor.underlyingActor().clear(); + follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0)); - RaftActorBehavior raftBehavior = - behavior.handleMessage(getRef(), appendEntries); + assertEquals("Next index", 3, log.last().getIndex() + 1); + assertEquals("Entry 1", entries.get(0), log.get(1)); + assertEquals("Entry 2", entries.get(1), log.get(2)); - assertEquals(expected, raftBehavior); + expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2); + } + + @Test + public void testHandleAppendEntriesAfterInstallingSnapshot(){ + logStart("testHandleAppendAfterInstallingSnapshot"); - // The entry at index 2 will be found out-of-sync with the leader - // and will be removed - // Then the two new entries will be added to the log - // Thus making the log to have 4 entries - assertEquals(4, log.last().getIndex() + 1); - assertNotNull(log.get(2)); + MockRaftActorContext context = createActorContext(); - assertEquals("one", log.get(1).getData().toString()); + // Prepare the receivers log + MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog(); - // Check that the entry at index 2 has the new data - assertEquals("two-1", log.get(2).getData().toString()); + // Set up a log as if it has been snapshotted + log.setSnapshotIndex(3); + log.setSnapshotTerm(1); - assertEquals("three", log.get(3).getData().toString()); + context.setReplicatedLog(log); - assertNotNull(log.get(3)); + // Prepare the entries to be sent with AppendEntries + List entries = new ArrayList<>(); + entries.add(newReplicatedLogEntry(1, 4, "four")); - // Also expect an AppendEntriesReply to be sent where success is false - final Boolean out = new ExpectMsg(duration("1 seconds"), - "AppendEntriesReply") { - // do not put code outside this method, will run afterwards - protected Boolean match(Object in) { - if (in instanceof AppendEntriesReply) { - AppendEntriesReply reply = (AppendEntriesReply) in; - return reply.isSuccess(); - } else { - throw noMatch(); - } - } - }.get(); + AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0); - assertEquals(true, out); + follower = createBehavior(context); + RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries); - }}; + Assert.assertSame(follower, newBehavior); + + expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4); } @@ -432,131 +746,325 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { */ @Test public void testHandleInstallSnapshot() throws Exception { - JavaTestKit javaTestKit = new JavaTestKit(getSystem()) {{ - - ActorRef leaderActor = getSystem().actorOf(Props.create( - MessageCollectorActor.class)); - - MockRaftActorContext context = (MockRaftActorContext) - createActorContext(getRef()); - - Follower follower = (Follower)createBehavior(context); - - HashMap followerSnapshot = new HashMap<>(); - followerSnapshot.put("1", "A"); - followerSnapshot.put("2", "B"); - followerSnapshot.put("3", "C"); - - ByteString bsSnapshot = toByteString(followerSnapshot); - ByteString chunkData = ByteString.EMPTY; - int offset = 0; - int snapshotLength = bsSnapshot.size(); - int i = 1; - - do { - chunkData = getNextChunk(bsSnapshot, offset); - final InstallSnapshot installSnapshot = - new InstallSnapshot(1, "leader-1", i, 1, - chunkData, i, 3); - follower.handleMessage(leaderActor, installSnapshot); - offset = offset + 50; - i++; - } while ((offset+50) < snapshotLength); - - final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, 3, 3); - follower.handleMessage(leaderActor, installSnapshot3); - - String[] matches = new ReceiveWhile(String.class, duration("2 seconds")) { - @Override - protected String match(Object o) throws Exception { - if (o instanceof ApplySnapshot) { - ApplySnapshot as = (ApplySnapshot)o; - if (as.getSnapshot().getLastIndex() != installSnapshot3.getLastIncludedIndex()) { - return "applySnapshot-lastIndex-mismatch"; - } - if (as.getSnapshot().getLastAppliedTerm() != installSnapshot3.getLastIncludedTerm()) { - return "applySnapshot-lastAppliedTerm-mismatch"; - } - if (as.getSnapshot().getLastAppliedIndex() != installSnapshot3.getLastIncludedIndex()) { - return "applySnapshot-lastAppliedIndex-mismatch"; - } - if (as.getSnapshot().getLastTerm() != installSnapshot3.getLastIncludedTerm()) { - return "applySnapshot-lastTerm-mismatch"; - } - return "applySnapshot"; - } - - return "ignoreCase"; - } - }.get(); - - String applySnapshotMatch = ""; - for (String reply: matches) { - if (reply.startsWith("applySnapshot")) { - applySnapshotMatch = reply; - } - } + logStart("testHandleInstallSnapshot"); + + MockRaftActorContext context = createActorContext(); + context.getTermInformation().update(1, "leader"); + + follower = createBehavior(context); + + ByteString bsSnapshot = createSnapshot(); + int offset = 0; + int snapshotLength = bsSnapshot.size(); + int chunkSize = 50; + int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0); + int lastIncludedIndex = 1; + int chunkIndex = 1; + InstallSnapshot lastInstallSnapshot = null; + + for(int i = 0; i < totalChunks; i++) { + ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize); + lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1, + chunkData, chunkIndex, totalChunks); + follower.handleMessage(leaderActor, lastInstallSnapshot); + offset = offset + 50; + lastIncludedIndex++; + chunkIndex++; + } + + ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor, + ApplySnapshot.class); + Snapshot snapshot = applySnapshot.getSnapshot(); + assertNotNull(lastInstallSnapshot); + assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex()); + assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(), + snapshot.getLastAppliedTerm()); + assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(), + snapshot.getLastAppliedIndex()); + assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm()); + Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState()); + assertEquals("getElectionTerm", 1, snapshot.getElectionTerm()); + assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor()); + applySnapshot.getCallback().onSuccess(); + + List replies = MessageCollectorActor.getAllMatching( + leaderActor, InstallSnapshotReply.class); + assertEquals("InstallSnapshotReply count", totalChunks, replies.size()); + + chunkIndex = 1; + for(InstallSnapshotReply reply: replies) { + assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex()); + assertEquals("getTerm", 1, reply.getTerm()); + assertEquals("isSuccess", true, reply.isSuccess()); + assertEquals("getFollowerId", context.getId(), reply.getFollowerId()); + } + + assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker()); + } + + + /** + * Verify that when an AppendEntries is sent to a follower during a snapshot install + * the Follower short-circuits the processing of the AppendEntries message. + * + * @throws Exception + */ + @Test + public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception { + logStart("testReceivingAppendEntriesDuringInstallSnapshot"); + + MockRaftActorContext context = createActorContext(); - assertEquals("applySnapshot", applySnapshotMatch); + follower = createBehavior(context); - Object messages = executeLocalOperation(leaderActor, "get-all-messages"); + ByteString bsSnapshot = createSnapshot(); + int snapshotLength = bsSnapshot.size(); + int chunkSize = 50; + int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0); + int lastIncludedIndex = 1; - assertNotNull(messages); - assertTrue(messages instanceof List); - List listMessages = (List) messages; + // Check that snapshot installation is not in progress + assertNull(((Follower) follower).getSnapshotTracker()); - int installSnapshotReplyReceivedCount = 0; - for (Object message: listMessages) { - if (message instanceof InstallSnapshotReply) { - ++installSnapshotReplyReceivedCount; - } + // Make sure that we have more than 1 chunk to send + assertTrue(totalChunks > 1); + + // Send an install snapshot with the first chunk to start the process of installing a snapshot + ByteString chunkData = getNextChunk(bsSnapshot, 0, chunkSize); + follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1, + chunkData, 1, totalChunks)); + + // Check if snapshot installation is in progress now + assertNotNull(((Follower) follower).getSnapshotTracker()); + + // Send an append entry + AppendEntries appendEntries = mock(AppendEntries.class); + doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm(); + + follower.handleMessage(leaderActor, appendEntries); + + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex()); + assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm()); + assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm()); + + // We should not hit the code that needs to look at prevLogIndex because we are short circuiting + verify(appendEntries, never()).getPrevLogIndex(); + + } + + @Test + public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception { + logStart("testInitialSyncUpWithHandleInstallSnapshot"); + + MockRaftActorContext context = createActorContext(); + + follower = createBehavior(context); + + ByteString bsSnapshot = createSnapshot(); + int offset = 0; + int snapshotLength = bsSnapshot.size(); + int chunkSize = 50; + int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0); + int lastIncludedIndex = 1; + int chunkIndex = 1; + InstallSnapshot lastInstallSnapshot = null; + + for(int i = 0; i < totalChunks; i++) { + ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize); + lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1, + chunkData, chunkIndex, totalChunks); + follower.handleMessage(leaderActor, lastInstallSnapshot); + offset = offset + 50; + lastIncludedIndex++; + chunkIndex++; + } + + FollowerInitialSyncUpStatus syncStatus = + MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertFalse(syncStatus.isInitialSyncDone()); + + // Clear all the messages + followerActor.underlyingActor().clear(); + + context.setLastApplied(101); + context.setCommitIndex(101); + setLastLogEntry(context, 1, 101, + new MockRaftActorContext.MockPayload("")); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0); + follower.handleMessage(leaderActor, appendEntries); + + syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertTrue(syncStatus.isInitialSyncDone()); + } + + @Test + public void testHandleOutOfSequenceInstallSnapshot() throws Exception { + logStart("testHandleOutOfSequenceInstallSnapshot"); + + MockRaftActorContext context = createActorContext(); + + follower = createBehavior(context); + + ByteString bsSnapshot = createSnapshot(); + + InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1, + getNextChunk(bsSnapshot, 10, 50), 3, 3); + follower.handleMessage(leaderActor, installSnapshot); + + InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, + InstallSnapshotReply.class); + + assertEquals("isSuccess", false, reply.isSuccess()); + assertEquals("getChunkIndex", -1, reply.getChunkIndex()); + assertEquals("getTerm", 1, reply.getTerm()); + assertEquals("getFollowerId", context.getId(), reply.getFollowerId()); + + assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker()); + } + + @Test + public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){ + MockRaftActorContext context = createActorContext(); + + Stopwatch stopwatch = Stopwatch.createStarted(); + + follower = createBehavior(context); + + MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class); + + long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); + + assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis()); + } + + @Test + public void testFollowerDoesNotScheduleAnElectionIfAutomaticElectionsAreDisabled(){ + MockRaftActorContext context = createActorContext(); + context.setConfigParams(new DefaultConfigParamsImpl(){ + @Override + public FiniteDuration getElectionTimeOutInterval() { + return FiniteDuration.apply(100, TimeUnit.MILLISECONDS); } + }); - assertEquals(3, installSnapshotReplyReceivedCount); + context.setRaftPolicy(createRaftPolicy(false, false)); - }}; + follower = createBehavior(context); + + MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 500); + } + + @Test + public void testElectionScheduledWhenAnyRaftRPCReceived(){ + MockRaftActorContext context = createActorContext(); + follower = createBehavior(context); + follower.handleMessage(leaderActor, new RaftRPC() { + @Override + public long getTerm() { + return 100; + } + }); + assertEquals("schedule election", 1, getElectionTimeoutCount(follower)); } - public Object executeLocalOperation(ActorRef actor, Object message) throws Exception { - return MessageCollectorActor.getAllMessages(actor); + @Test + public void testElectionNotScheduledWhenNonRaftRPCMessageReceived(){ + MockRaftActorContext context = createActorContext(); + follower = createBehavior(context); + follower.handleMessage(leaderActor, "non-raft-rpc"); + assertEquals("schedule election", 0, getElectionTimeoutCount(follower)); } - public ByteString getNextChunk (ByteString bs, int offset){ + public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){ int snapshotLength = bs.size(); int start = offset; - int size = 50; - if (50 > snapshotLength) { + int size = chunkSize; + if (chunkSize > snapshotLength) { size = snapshotLength; } else { - if ((start + 50) > snapshotLength) { + if ((start + chunkSize) > snapshotLength) { size = snapshotLength - start; } } return bs.substring(start, start + size); } - private ByteString toByteString(Map state) { - ByteArrayOutputStream b = null; - ObjectOutputStream o = null; - try { - try { - b = new ByteArrayOutputStream(); - o = new ObjectOutputStream(b); - o.writeObject(state); - byte[] snapshotBytes = b.toByteArray(); - return ByteString.copyFrom(snapshotBytes); - } finally { - if (o != null) { - o.flush(); - o.close(); - } - if (b != null) { - b.close(); - } - } - } catch (IOException e) { - org.junit.Assert.fail("IOException in converting Hashmap to Bytestring:" + e); + private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess, + String expFollowerId, long expLogLastTerm, long expLogLastIndex) { + expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false); + } + + private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess, + String expFollowerId, long expLogLastTerm, long expLogLastIndex, + boolean expForceInstallSnapshot) { + + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, + AppendEntriesReply.class); + + assertEquals("isSuccess", expSuccess, reply.isSuccess()); + assertEquals("getTerm", expTerm, reply.getTerm()); + assertEquals("getFollowerId", expFollowerId, reply.getFollowerId()); + assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm()); + assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex()); + assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion()); + assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot()); + } + + + private static ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) { + return new MockRaftActorContext.MockReplicatedLogEntry(term, index, + new MockRaftActorContext.MockPayload(data)); + } + + private ByteString createSnapshot(){ + HashMap followerSnapshot = new HashMap<>(); + followerSnapshot.put("1", "A"); + followerSnapshot.put("2", "B"); + followerSnapshot.put("3", "C"); + + return toByteString(followerSnapshot); + } + + @Override + protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext, + ActorRef actorRef, RaftRPC rpc) throws Exception { + super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc); + + String expVotedFor = RequestVote.class.isInstance(rpc) ? ((RequestVote)rpc).getCandidateId() : null; + assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor()); + } + + @Override + protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef replyActor) + throws Exception { + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class); + assertEquals("isSuccess", true, reply.isSuccess()); + } + + private static class TestFollower extends Follower { + + int electionTimeoutCount = 0; + + public TestFollower(RaftActorContext context) { + super(context); + } + + @Override + protected void scheduleElection(FiniteDuration interval) { + electionTimeoutCount++; + super.scheduleElection(interval); + } + + public int getElectionTimeoutCount() { + return electionTimeoutCount; } - return null; } }