X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FFollowerTest.java;h=b01fd33914f9500c8537cc86032aee924b54f7ed;hp=6b0857351df132fd30b10406dae2acfe3cbd1237;hb=95d7b8820236d16cb7e37c4a95fcae6f6d55581e;hpb=03e752cbd625921ece92c5281cd4e1a8c81b3210 diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index 6b0857351d..b01fd33914 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -1,44 +1,84 @@ +/* + * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + package org.opendaylight.controller.cluster.raft.behaviors; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import akka.actor.ActorRef; import akka.actor.Props; -import akka.testkit.JavaTestKit; +import akka.testkit.TestActorRef; +import com.google.common.base.Stopwatch; +import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectOutputStream; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; -import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.Assert; import org.junit.Test; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; +import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; +import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; +import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; -import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; +import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; +import org.opendaylight.controller.cluster.raft.persisted.ServerInfo; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; +import scala.concurrent.duration.FiniteDuration; + +public class FollowerTest extends AbstractRaftActorBehaviorTest { + + private final TestActorRef followerActor = actorFactory.createTestActor( + Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower")); + + private final TestActorRef leaderActor = actorFactory.createTestActor( + Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader")); -public class FollowerTest extends AbstractRaftActorBehaviorTest { + private Follower follower; - private final ActorRef followerActor = getSystem().actorOf(Props.create( - DoNothingActor.class)); + private final short payloadVersion = 5; + @Override + @After + public void tearDown() throws Exception { + if(follower != null) { + follower.close(); + } + + super.tearDown(); + } - @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) { - return new Follower(actorContext); + @Override + protected Follower createBehavior(RaftActorContext actorContext) { + return spy(new Follower(actorContext)); } @Override @@ -48,119 +88,376 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { @Override protected MockRaftActorContext createActorContext(ActorRef actorRef){ - return new MockRaftActorContext("test", getSystem(), actorRef); + MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef); + context.setPayloadVersion(payloadVersion ); + return context; } @Test public void testThatAnElectionTimeoutIsTriggered(){ - new JavaTestKit(getSystem()) {{ + MockRaftActorContext actorContext = createActorContext(); + follower = new Follower(actorContext); + + MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class, + actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis()); + } - new Within(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6)) { - @Override - protected void run() { + @Test + public void testHandleElectionTimeoutWhenNoLeaderMessageReceived() { + logStart("testHandleElectionTimeoutWhenNoLeaderMessageReceived"); - Follower follower = new Follower(createActorContext(getTestActor())); + MockRaftActorContext context = createActorContext(); + follower = new Follower(context); - final Boolean out = new ExpectMsg(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6), "ElectionTimeout") { - // do not put code outside this method, will run afterwards - @Override - protected Boolean match(Object in) { - if (in instanceof ElectionTimeout) { - return true; - } else { - throw noMatch(); - } - } - }.get(); + Uninterruptibles.sleepUninterruptibly(context.getConfigParams().getElectionTimeOutInterval().toMillis(), + TimeUnit.MILLISECONDS); + RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE); - assertEquals(true, out); - } - }; - }}; + assertTrue(raftBehavior instanceof Candidate); } @Test - public void testHandleElectionTimeout(){ - RaftActorContext raftActorContext = createActorContext(); - Follower follower = - new Follower(raftActorContext); + public void testHandleElectionTimeoutWhenLeaderMessageReceived() { + logStart("testHandleElectionTimeoutWhenLeaderMessageReceived"); + + MockRaftActorContext context = createActorContext(); + ((DefaultConfigParamsImpl) context.getConfigParams()). + setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS)); + ((DefaultConfigParamsImpl) context.getConfigParams()).setElectionTimeoutFactor(4); + + follower = new Follower(context); + context.setCurrentBehavior(follower); + + Uninterruptibles.sleepUninterruptibly(context.getConfigParams(). + getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS); + follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(), + -1, -1, (short) 1)); + + Uninterruptibles.sleepUninterruptibly(130, TimeUnit.MILLISECONDS); + RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE); + assertTrue(raftBehavior instanceof Follower); + + Uninterruptibles.sleepUninterruptibly(context.getConfigParams(). + getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS); + follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(), + -1, -1, (short) 1)); + + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE); + assertTrue(raftBehavior instanceof Follower); + } - RaftActorBehavior raftBehavior = - follower.handleMessage(followerActor, new ElectionTimeout()); + @Test + public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull() { + logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull"); - assertTrue(raftBehavior instanceof Candidate); + MockRaftActorContext context = createActorContext(); + long term = 1000; + context.getTermInformation().update(term, null); + + follower = createBehavior(context); + + follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999)); + + RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class); + + assertEquals("isVoteGranted", true, reply.isVoteGranted()); + assertEquals("getTerm", term, reply.getTerm()); + verify(follower).scheduleElection(any(FiniteDuration.class)); } @Test - public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){ - new JavaTestKit(getSystem()) {{ + public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){ + logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId"); + + MockRaftActorContext context = createActorContext(); + long term = 1000; + context.getTermInformation().update(term, "test"); - new Within(duration("1 seconds")) { - @Override - protected void run() { + follower = createBehavior(context); - RaftActorContext context = createActorContext(getTestActor()); + follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999)); - context.getTermInformation().update(1000, null); + RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class); - RaftActorBehavior follower = createBehavior(context); + assertEquals("isVoteGranted", false, reply.isVoteGranted()); + verify(follower, never()).scheduleElection(any(FiniteDuration.class)); + } - follower.handleMessage(getTestActor(), new RequestVote(1000, "test", 10000, 999)); - final Boolean out = new ExpectMsg(duration("1 seconds"), "RequestVoteReply") { - // do not put code outside this method, will run afterwards - @Override - protected Boolean match(Object in) { - if (in instanceof RequestVoteReply) { - RequestVoteReply reply = (RequestVoteReply) in; - return reply.isVoteGranted(); - } else { - throw noMatch(); - } - } - }.get(); + @Test + public void testHandleFirstAppendEntries() throws Exception { + logStart("testHandleFirstAppendEntries"); - assertEquals(true, out); - } - }; - }}; + MockRaftActorContext context = createActorContext(); + context.getReplicatedLog().clear(0,2); + context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar")); + context.getReplicatedLog().setSnapshotIndex(99); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + Assert.assertEquals(1, context.getReplicatedLog().size()); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + + assertFalse(syncStatus.isInitialSyncDone()); + assertTrue("append entries reply should be true", reply.isSuccess()); } @Test - public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){ - new JavaTestKit(getSystem()) {{ + public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception { + logStart("testHandleFirstAppendEntries"); + + MockRaftActorContext context = createActorContext(); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + + assertFalse(syncStatus.isInitialSyncDone()); + assertFalse("append entries reply should be false", reply.isSuccess()); + } + + @Test + public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() throws Exception { + logStart("testHandleFirstAppendEntries"); + + MockRaftActorContext context = createActorContext(); + context.getReplicatedLog().clear(0,2); + context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar")); + context.getReplicatedLog().setSnapshotIndex(99); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + + assertFalse(syncStatus.isInitialSyncDone()); + assertTrue("append entries reply should be true", reply.isSuccess()); + } + + @Test + public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() throws Exception { + logStart("testHandleFirstAppendEntries"); + + MockRaftActorContext context = createActorContext(); + context.getReplicatedLog().clear(0,2); + context.getReplicatedLog().setSnapshotIndex(100); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + + assertFalse(syncStatus.isInitialSyncDone()); + assertTrue("append entries reply should be true", reply.isSuccess()); + } + + @Test + public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshotButCalculatedPreviousEntryMissing() throws Exception { + logStart("testHandleFirstAppendEntries"); + + MockRaftActorContext context = createActorContext(); + context.getReplicatedLog().clear(0,2); + context.getReplicatedLog().setSnapshotIndex(100); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 105, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0); - new Within(duration("1 seconds")) { - @Override - protected void run() { + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); - RaftActorContext context = createActorContext(getTestActor()); + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + + assertFalse(syncStatus.isInitialSyncDone()); + assertFalse("append entries reply should be false", reply.isSuccess()); + } + + @Test + public void testHandleSyncUpAppendEntries() throws Exception { + logStart("testHandleSyncUpAppendEntries"); + + MockRaftActorContext context = createActorContext(); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertFalse(syncStatus.isInitialSyncDone()); + + // Clear all the messages + followerActor.underlyingActor().clear(); + + context.setLastApplied(101); + context.setCommitIndex(101); + setLastLogEntry(context, 1, 101, + new MockRaftActorContext.MockPayload("")); + + entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0); + follower.handleMessage(leaderActor, appendEntries); + + syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertTrue(syncStatus.isInitialSyncDone()); + + followerActor.underlyingActor().clear(); + + // Sending the same message again should not generate another message + + follower.handleMessage(leaderActor, appendEntries); + + syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertNull(syncStatus); + + } + + @Test + public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception { + logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete"); + + MockRaftActorContext context = createActorContext(); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertFalse(syncStatus.isInitialSyncDone()); + + // Clear all the messages + followerActor.underlyingActor().clear(); + + context.setLastApplied(100); + setLastLogEntry(context, 1, 100, + new MockRaftActorContext.MockPayload("")); + + entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // leader-2 is becoming the leader now and it says the commitIndex is 45 + appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0); + follower.handleMessage(leaderActor, appendEntries); + + syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + // We get a new message saying initial status is not done + assertFalse(syncStatus.isInitialSyncDone()); + + } + + + @Test + public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception { + logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete"); + + MockRaftActorContext context = createActorContext(); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertFalse(syncStatus.isInitialSyncDone()); + + // Clear all the messages + followerActor.underlyingActor().clear(); + + context.setLastApplied(101); + context.setCommitIndex(101); + setLastLogEntry(context, 1, 101, + new MockRaftActorContext.MockPayload("")); + + entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0); + follower.handleMessage(leaderActor, appendEntries); + + syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertTrue(syncStatus.isInitialSyncDone()); + + // Clear all the messages + followerActor.underlyingActor().clear(); + + context.setLastApplied(100); + setLastLogEntry(context, 1, 100, + new MockRaftActorContext.MockPayload("")); - context.getTermInformation().update(1000, "test"); + entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); - RaftActorBehavior follower = createBehavior(context); + // leader-2 is becoming the leader now and it says the commitIndex is 45 + appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0); + follower.handleMessage(leaderActor, appendEntries); - follower.handleMessage(getTestActor(), new RequestVote(1000, "candidate", 10000, 999)); + syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); - final Boolean out = new ExpectMsg(duration("1 seconds"), "RequestVoteReply") { - // do not put code outside this method, will run afterwards - @Override - protected Boolean match(Object in) { - if (in instanceof RequestVoteReply) { - RequestVoteReply reply = (RequestVoteReply) in; - return reply.isVoteGranted(); - } else { - throw noMatch(); - } - } - }.get(); + // We get a new message saying initial status is not done + assertFalse(syncStatus.isInitialSyncDone()); - assertEquals(false, out); - } - }; - }}; } + /** * This test verifies that when an AppendEntries RPC is received by a RaftActor * with a commitIndex that is greater than what has been applied to the @@ -171,32 +468,25 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { */ @Test public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception { - new JavaTestKit(getSystem()) {{ + logStart("testHandleAppendEntriesWithNewerCommitIndex"); - RaftActorContext context = - createActorContext(); + MockRaftActorContext context = createActorContext(); - context.setLastApplied(100); - setLastLogEntry((MockRaftActorContext) context, 1, 100, + context.setLastApplied(100); + setLastLogEntry(context, 1, 100, new MockRaftActorContext.MockPayload("")); - ((MockRaftActorContext) context).getReplicatedLog().setSnapshotIndex(99); - - List entries = - Arrays.asList( - (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(2, 101, - new MockRaftActorContext.MockPayload("foo")) - ); + context.getReplicatedLog().setSnapshotIndex(99); - // The new commitIndex is 101 - AppendEntries appendEntries = - new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100); + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); - RaftActorBehavior raftBehavior = - createBehavior(context).handleMessage(getRef(), appendEntries); + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0); - assertEquals(101L, context.getLastApplied()); + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); - }}; + assertEquals("getLastApplied", 101L, context.getLastApplied()); } /** @@ -207,58 +497,30 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { * @throws Exception */ @Test - public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() - throws Exception { - new JavaTestKit(getSystem()) {{ - - MockRaftActorContext context = createActorContext(); - - // First set the receivers term to lower number - context.getTermInformation().update(95, "test"); + public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() { + logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm"); - // Set the last log entry term for the receiver to be greater than - // what we will be sending as the prevLogTerm in AppendEntries - MockRaftActorContext.SimpleReplicatedLog mockReplicatedLog = - setLastLogEntry(context, 20, 0, new MockRaftActorContext.MockPayload("")); + MockRaftActorContext context = createActorContext(); - // AppendEntries is now sent with a bigger term - // this will set the receivers term to be the same as the sender's term - AppendEntries appendEntries = - new AppendEntries(100, "leader-1", 0, 0, null, 101, -1); + // First set the receivers term to lower number + context.getTermInformation().update(95, "test"); - RaftActorBehavior behavior = createBehavior(context); + // AppendEntries is now sent with a bigger term + // this will set the receivers term to be the same as the sender's term + AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0); - // Send an unknown message so that the state of the RaftActor remains unchanged - RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown"); + follower = createBehavior(context); - RaftActorBehavior raftBehavior = - behavior.handleMessage(getRef(), appendEntries); + RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries); - assertEquals(expected, raftBehavior); + Assert.assertSame(follower, newBehavior); - // Also expect an AppendEntriesReply to be sent where success is false - final Boolean out = new ExpectMsg(duration("1 seconds"), - "AppendEntriesReply") { - // do not put code outside this method, will run afterwards - @Override - protected Boolean match(Object in) { - if (in instanceof AppendEntriesReply) { - AppendEntriesReply reply = (AppendEntriesReply) in; - return reply.isSuccess(); - } else { - throw noMatch(); - } - } - }.get(); + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, + AppendEntriesReply.class); - assertEquals(false, out); - - - }}; + assertEquals("isSuccess", false, reply.isSuccess()); } - - /** * This test verifies that when a new AppendEntries message is received with * new entries and the logs of the sender and receiver match that the new @@ -268,278 +530,244 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { * @throws Exception */ @Test - public void testHandleAppendEntriesAddNewEntries() throws Exception { - new JavaTestKit(getSystem()) {{ + public void testHandleAppendEntriesAddNewEntries() { + logStart("testHandleAppendEntriesAddNewEntries"); - MockRaftActorContext context = createActorContext(); + MockRaftActorContext context = createActorContext(); - // First set the receivers term to lower number - context.getTermInformation().update(1, "test"); + // First set the receivers term to lower number + context.getTermInformation().update(1, "test"); - // Prepare the receivers log - MockRaftActorContext.SimpleReplicatedLog log = - new MockRaftActorContext.SimpleReplicatedLog(); - log.append( - new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero"))); - log.append( - new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one"))); - log.append( - new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two"))); + // Prepare the receivers log + MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog(); + log.append(newReplicatedLogEntry(1, 0, "zero")); + log.append(newReplicatedLogEntry(1, 1, "one")); + log.append(newReplicatedLogEntry(1, 2, "two")); - context.setReplicatedLog(log); + context.setReplicatedLog(log); - // Prepare the entries to be sent with AppendEntries - List entries = new ArrayList<>(); - entries.add( - new MockRaftActorContext.MockReplicatedLogEntry(1, 3, new MockRaftActorContext.MockPayload("three"))); - entries.add( - new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("four"))); + // Prepare the entries to be sent with AppendEntries + List entries = new ArrayList<>(); + entries.add(newReplicatedLogEntry(1, 3, "three")); + entries.add(newReplicatedLogEntry(1, 4, "four")); - // Send appendEntries with the same term as was set on the receiver - // before the new behavior was created (1 in this case) - // This will not work for a Candidate because as soon as a Candidate - // is created it increments the term - AppendEntries appendEntries = - new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1); + // Send appendEntries with the same term as was set on the receiver + // before the new behavior was created (1 in this case) + // This will not work for a Candidate because as soon as a Candidate + // is created it increments the term + short leaderPayloadVersion = 10; + String leaderId = "leader-1"; + AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion); - RaftActorBehavior behavior = createBehavior(context); + follower = createBehavior(context); - // Send an unknown message so that the state of the RaftActor remains unchanged - RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown"); + RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries); - RaftActorBehavior raftBehavior = - behavior.handleMessage(getRef(), appendEntries); + Assert.assertSame(follower, newBehavior); - assertEquals(expected, raftBehavior); - assertEquals(5, log.last().getIndex() + 1); - assertNotNull(log.get(3)); - assertNotNull(log.get(4)); + assertEquals("Next index", 5, log.last().getIndex() + 1); + assertEquals("Entry 3", entries.get(0), log.get(3)); + assertEquals("Entry 4", entries.get(1), log.get(4)); - // Also expect an AppendEntriesReply to be sent where success is false - final Boolean out = new ExpectMsg(duration("1 seconds"), - "AppendEntriesReply") { - // do not put code outside this method, will run afterwards - @Override - protected Boolean match(Object in) { - if (in instanceof AppendEntriesReply) { - AppendEntriesReply reply = (AppendEntriesReply) in; - return reply.isSuccess(); - } else { - throw noMatch(); - } - } - }.get(); + assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion()); + assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId()); - assertEquals(true, out); - - - }}; + expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4); } - - /** * This test verifies that when a new AppendEntries message is received with * new entries and the logs of the sender and receiver are out-of-sync that * the log is first corrected by removing the out of sync entries from the * log and then adding in the new entries sent with the AppendEntries message - * - * @throws Exception */ @Test - public void testHandleAppendEntriesCorrectReceiverLogEntries() - throws Exception { - new JavaTestKit(getSystem()) {{ + public void testHandleAppendEntriesCorrectReceiverLogEntries() { + logStart("testHandleAppendEntriesCorrectReceiverLogEntries"); - MockRaftActorContext context = createActorContext(); + MockRaftActorContext context = createActorContext(); - // First set the receivers term to lower number - context.getTermInformation().update(2, "test"); + // First set the receivers term to lower number + context.getTermInformation().update(1, "test"); - // Prepare the receivers log - MockRaftActorContext.SimpleReplicatedLog log = - new MockRaftActorContext.SimpleReplicatedLog(); - log.append( - new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero"))); - log.append( - new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one"))); - log.append( - new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two"))); + // Prepare the receivers log + MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog(); + log.append(newReplicatedLogEntry(1, 0, "zero")); + log.append(newReplicatedLogEntry(1, 1, "one")); + log.append(newReplicatedLogEntry(1, 2, "two")); - context.setReplicatedLog(log); + context.setReplicatedLog(log); - // Prepare the entries to be sent with AppendEntries - List entries = new ArrayList<>(); - entries.add( - new MockRaftActorContext.MockReplicatedLogEntry(2, 2, new MockRaftActorContext.MockPayload("two-1"))); - entries.add( - new MockRaftActorContext.MockReplicatedLogEntry(2, 3, new MockRaftActorContext.MockPayload("three"))); + // Prepare the entries to be sent with AppendEntries + List entries = new ArrayList<>(); + entries.add(newReplicatedLogEntry(2, 2, "two-1")); + entries.add(newReplicatedLogEntry(2, 3, "three")); - // Send appendEntries with the same term as was set on the receiver - // before the new behavior was created (1 in this case) - // This will not work for a Candidate because as soon as a Candidate - // is created it increments the term - AppendEntries appendEntries = - new AppendEntries(2, "leader-1", 1, 1, entries, 3, -1); + // Send appendEntries with the same term as was set on the receiver + // before the new behavior was created (1 in this case) + // This will not work for a Candidate because as soon as a Candidate + // is created it increments the term + AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0); - RaftActorBehavior behavior = createBehavior(context); + follower = createBehavior(context); - // Send an unknown message so that the state of the RaftActor remains unchanged - RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown"); + RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries); - RaftActorBehavior raftBehavior = - behavior.handleMessage(getRef(), appendEntries); + Assert.assertSame(follower, newBehavior); - assertEquals(expected, raftBehavior); + // The entry at index 2 will be found out-of-sync with the leader + // and will be removed + // Then the two new entries will be added to the log + // Thus making the log to have 4 entries + assertEquals("Next index", 4, log.last().getIndex() + 1); + //assertEquals("Entry 2", entries.get(0), log.get(2)); - // The entry at index 2 will be found out-of-sync with the leader - // and will be removed - // Then the two new entries will be added to the log - // Thus making the log to have 4 entries - assertEquals(4, log.last().getIndex() + 1); - assertNotNull(log.get(2)); + assertEquals("Entry 1 data", "one", log.get(1).getData().toString()); - assertEquals("one", log.get(1).getData().toString()); + // Check that the entry at index 2 has the new data + assertEquals("Entry 2", entries.get(0), log.get(2)); - // Check that the entry at index 2 has the new data - assertEquals("two-1", log.get(2).getData().toString()); + assertEquals("Entry 3", entries.get(1), log.get(3)); - assertEquals("three", log.get(3).getData().toString()); + expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3); + } - assertNotNull(log.get(3)); + @Test + public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() { + logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot"); - // Also expect an AppendEntriesReply to be sent where success is false - final Boolean out = new ExpectMsg(duration("1 seconds"), - "AppendEntriesReply") { - // do not put code outside this method, will run afterwards - @Override - protected Boolean match(Object in) { - if (in instanceof AppendEntriesReply) { - AppendEntriesReply reply = (AppendEntriesReply) in; - return reply.isSuccess(); - } else { - throw noMatch(); - } - } - }.get(); + MockRaftActorContext context = createActorContext(); - assertEquals(true, out); + // First set the receivers term to lower number + context.getTermInformation().update(1, "test"); + // Prepare the receivers log + MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog(); + log.append(newReplicatedLogEntry(1, 0, "zero")); + log.append(newReplicatedLogEntry(1, 1, "one")); + log.append(newReplicatedLogEntry(1, 2, "two")); - }}; + context.setReplicatedLog(log); + + // Prepare the entries to be sent with AppendEntries + List entries = new ArrayList<>(); + entries.add(newReplicatedLogEntry(2, 2, "two-1")); + entries.add(newReplicatedLogEntry(2, 3, "three")); + + // Send appendEntries with the same term as was set on the receiver + // before the new behavior was created (1 in this case) + // This will not work for a Candidate because as soon as a Candidate + // is created it increments the term + AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0); + + context.setRaftPolicy(createRaftPolicy(false, true)); + follower = createBehavior(context); + + RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries); + + Assert.assertSame(follower, newBehavior); + + expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true); } @Test public void testHandleAppendEntriesPreviousLogEntryMissing(){ - new JavaTestKit(getSystem()) {{ + logStart("testHandleAppendEntriesPreviousLogEntryMissing"); - MockRaftActorContext context = createActorContext(); + MockRaftActorContext context = createActorContext(); - // Prepare the receivers log - MockRaftActorContext.SimpleReplicatedLog log = - new MockRaftActorContext.SimpleReplicatedLog(); - log.append( - new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero"))); - log.append( - new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one"))); - log.append( - new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two"))); + // Prepare the receivers log + MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog(); + log.append(newReplicatedLogEntry(1, 0, "zero")); + log.append(newReplicatedLogEntry(1, 1, "one")); + log.append(newReplicatedLogEntry(1, 2, "two")); - context.setReplicatedLog(log); + context.setReplicatedLog(log); - // Prepare the entries to be sent with AppendEntries - List entries = new ArrayList<>(); - entries.add( - new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1"))); + // Prepare the entries to be sent with AppendEntries + List entries = new ArrayList<>(); + entries.add(newReplicatedLogEntry(1, 4, "four")); - AppendEntries appendEntries = - new AppendEntries(1, "leader-1", 3, 1, entries, 4, -1); + AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0); - RaftActorBehavior behavior = createBehavior(context); + follower = createBehavior(context); - // Send an unknown message so that the state of the RaftActor remains unchanged - RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown"); + RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries); - RaftActorBehavior raftBehavior = - behavior.handleMessage(getRef(), appendEntries); + Assert.assertSame(follower, newBehavior); - assertEquals(expected, raftBehavior); + expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2); + } - // Also expect an AppendEntriesReply to be sent where success is false - final Boolean out = new ExpectMsg(duration("1 seconds"), - "AppendEntriesReply") { - // do not put code outside this method, will run afterwards - @Override - protected Boolean match(Object in) { - if (in instanceof AppendEntriesReply) { - AppendEntriesReply reply = (AppendEntriesReply) in; - return reply.isSuccess(); - } else { - throw noMatch(); - } - } - }.get(); + @Test + public void testHandleAppendEntriesWithExistingLogEntry() { + logStart("testHandleAppendEntriesWithExistingLogEntry"); - assertEquals(false, out); + MockRaftActorContext context = createActorContext(); - }}; + context.getTermInformation().update(1, "test"); - } + // Prepare the receivers log + MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog(); + log.append(newReplicatedLogEntry(1, 0, "zero")); + log.append(newReplicatedLogEntry(1, 1, "one")); - @Test - public void testHandleAppendAfterInstallingSnapshot(){ - new JavaTestKit(getSystem()) {{ + context.setReplicatedLog(log); + + // Send the last entry again. + List entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one")); + + follower = createBehavior(context); - MockRaftActorContext context = createActorContext(); + follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0)); + assertEquals("Next index", 2, log.last().getIndex() + 1); + assertEquals("Entry 1", entries.get(0), log.get(1)); - // Prepare the receivers log - MockRaftActorContext.SimpleReplicatedLog log = - new MockRaftActorContext.SimpleReplicatedLog(); + expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1); - // Set up a log as if it has been snapshotted - log.setSnapshotIndex(3); - log.setSnapshotTerm(1); + // Send the last entry again and also a new one. - context.setReplicatedLog(log); + entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two")); - // Prepare the entries to be sent with AppendEntries - List entries = new ArrayList<>(); - entries.add( - new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1"))); + leaderActor.underlyingActor().clear(); + follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0)); - AppendEntries appendEntries = - new AppendEntries(1, "leader-1", 3, 1, entries, 4, 3); + assertEquals("Next index", 3, log.last().getIndex() + 1); + assertEquals("Entry 1", entries.get(0), log.get(1)); + assertEquals("Entry 2", entries.get(1), log.get(2)); - RaftActorBehavior behavior = createBehavior(context); + expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2); + } + + @Test + public void testHandleAppendEntriesAfterInstallingSnapshot(){ + logStart("testHandleAppendAfterInstallingSnapshot"); - // Send an unknown message so that the state of the RaftActor remains unchanged - RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown"); + MockRaftActorContext context = createActorContext(); - RaftActorBehavior raftBehavior = - behavior.handleMessage(getRef(), appendEntries); + // Prepare the receivers log + MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog(); - assertEquals(expected, raftBehavior); + // Set up a log as if it has been snapshotted + log.setSnapshotIndex(3); + log.setSnapshotTerm(1); - // Also expect an AppendEntriesReply to be sent where success is false - final Boolean out = new ExpectMsg(duration("1 seconds"), - "AppendEntriesReply") { - // do not put code outside this method, will run afterwards - @Override - protected Boolean match(Object in) { - if (in instanceof AppendEntriesReply) { - AppendEntriesReply reply = (AppendEntriesReply) in; - return reply.isSuccess(); - } else { - throw noMatch(); - } - } - }.get(); + context.setReplicatedLog(log); - assertEquals(true, out); + // Prepare the entries to be sent with AppendEntries + List entries = new ArrayList<>(); + entries.add(newReplicatedLogEntry(1, 4, "four")); - }}; + AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0); + follower = createBehavior(context); + + RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries); + + Assert.assertSame(follower, newBehavior); + + expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4); } @@ -551,180 +779,376 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { */ @Test public void testHandleInstallSnapshot() throws Exception { - JavaTestKit javaTestKit = new JavaTestKit(getSystem()) {{ - - ActorRef leaderActor = getSystem().actorOf(Props.create( - MessageCollectorActor.class)); - - MockRaftActorContext context = createActorContext(getRef()); - - Follower follower = (Follower)createBehavior(context); - - HashMap followerSnapshot = new HashMap<>(); - followerSnapshot.put("1", "A"); - followerSnapshot.put("2", "B"); - followerSnapshot.put("3", "C"); - - ByteString bsSnapshot = toByteString(followerSnapshot); - ByteString chunkData = ByteString.EMPTY; - int offset = 0; - int snapshotLength = bsSnapshot.size(); - int i = 1; - int chunkIndex = 1; - - do { - chunkData = getNextChunk(bsSnapshot, offset); - final InstallSnapshot installSnapshot = - new InstallSnapshot(1, "leader-1", i, 1, - chunkData, chunkIndex, 3); - follower.handleMessage(leaderActor, installSnapshot); - offset = offset + 50; - i++; - chunkIndex++; - } while ((offset+50) < snapshotLength); - - final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, chunkIndex, 3); - follower.handleMessage(leaderActor, installSnapshot3); - - String[] matches = new ReceiveWhile(String.class, duration("2 seconds")) { - @Override - protected String match(Object o) throws Exception { - if (o instanceof ApplySnapshot) { - ApplySnapshot as = (ApplySnapshot)o; - if (as.getSnapshot().getLastIndex() != installSnapshot3.getLastIncludedIndex()) { - return "applySnapshot-lastIndex-mismatch"; - } - if (as.getSnapshot().getLastAppliedTerm() != installSnapshot3.getLastIncludedTerm()) { - return "applySnapshot-lastAppliedTerm-mismatch"; - } - if (as.getSnapshot().getLastAppliedIndex() != installSnapshot3.getLastIncludedIndex()) { - return "applySnapshot-lastAppliedIndex-mismatch"; - } - if (as.getSnapshot().getLastTerm() != installSnapshot3.getLastIncludedTerm()) { - return "applySnapshot-lastTerm-mismatch"; - } - return "applySnapshot"; - } - - return "ignoreCase"; - } - }.get(); - - // Verify that after a snapshot is successfully applied the collected snapshot chunks is reset to empty - assertEquals(ByteString.EMPTY, follower.getSnapshotChunksCollected()); - - String applySnapshotMatch = ""; - for (String reply: matches) { - if (reply.startsWith("applySnapshot")) { - applySnapshotMatch = reply; - } - } + logStart("testHandleInstallSnapshot"); + + MockRaftActorContext context = createActorContext(); + context.getTermInformation().update(1, "leader"); + + follower = createBehavior(context); + + ByteString bsSnapshot = createSnapshot(); + int offset = 0; + int snapshotLength = bsSnapshot.size(); + int chunkSize = 50; + int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0); + int lastIncludedIndex = 1; + int chunkIndex = 1; + InstallSnapshot lastInstallSnapshot = null; + + for(int i = 0; i < totalChunks; i++) { + byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize); + lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1, + chunkData, chunkIndex, totalChunks); + follower.handleMessage(leaderActor, lastInstallSnapshot); + offset = offset + 50; + lastIncludedIndex++; + chunkIndex++; + } - assertEquals("applySnapshot", applySnapshotMatch); + ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor, + ApplySnapshot.class); + Snapshot snapshot = applySnapshot.getSnapshot(); + assertNotNull(lastInstallSnapshot); + assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex()); + assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(), + snapshot.getLastAppliedTerm()); + assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(), + snapshot.getLastAppliedIndex()); + assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm()); + Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState()); + assertEquals("getElectionTerm", 1, snapshot.getElectionTerm()); + assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor()); + applySnapshot.getCallback().onSuccess(); + + List replies = MessageCollectorActor.getAllMatching( + leaderActor, InstallSnapshotReply.class); + assertEquals("InstallSnapshotReply count", totalChunks, replies.size()); + + chunkIndex = 1; + for(InstallSnapshotReply reply: replies) { + assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex()); + assertEquals("getTerm", 1, reply.getTerm()); + assertEquals("isSuccess", true, reply.isSuccess()); + assertEquals("getFollowerId", context.getId(), reply.getFollowerId()); + } - Object messages = executeLocalOperation(leaderActor, "get-all-messages"); + assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker()); + } - assertNotNull(messages); - assertTrue(messages instanceof List); - List listMessages = (List) messages; - int installSnapshotReplyReceivedCount = 0; - for (Object message: listMessages) { - if (message instanceof InstallSnapshotReply) { - ++installSnapshotReplyReceivedCount; - } - } + /** + * Verify that when an AppendEntries is sent to a follower during a snapshot install + * the Follower short-circuits the processing of the AppendEntries message. + * + * @throws Exception + */ + @Test + public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception { + logStart("testReceivingAppendEntriesDuringInstallSnapshot"); + + MockRaftActorContext context = createActorContext(); + + follower = createBehavior(context); - assertEquals(3, installSnapshotReplyReceivedCount); + ByteString bsSnapshot = createSnapshot(); + int snapshotLength = bsSnapshot.size(); + int chunkSize = 50; + int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0); + int lastIncludedIndex = 1; - }}; + // Check that snapshot installation is not in progress + assertNull(follower.getSnapshotTracker()); + + // Make sure that we have more than 1 chunk to send + assertTrue(totalChunks > 1); + + // Send an install snapshot with the first chunk to start the process of installing a snapshot + byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize); + follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1, + chunkData, 1, totalChunks)); + + // Check if snapshot installation is in progress now + assertNotNull(follower.getSnapshotTracker()); + + // Send an append entry + AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1, + Arrays.asList(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1); + + follower.handleMessage(leaderActor, appendEntries); + + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + assertEquals("isSuccess", true, reply.isSuccess()); + assertEquals("getLogLastIndex", context.getReplicatedLog().lastIndex(), reply.getLogLastIndex()); + assertEquals("getLogLastTerm", context.getReplicatedLog().lastTerm(), reply.getLogLastTerm()); + assertEquals("getTerm", context.getTermInformation().getCurrentTerm(), reply.getTerm()); + + assertNotNull(follower.getSnapshotTracker()); + } + + @Test + public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() throws Exception { + logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader"); + + MockRaftActorContext context = createActorContext(); + + follower = createBehavior(context); + + ByteString bsSnapshot = createSnapshot(); + int snapshotLength = bsSnapshot.size(); + int chunkSize = 50; + int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0); + int lastIncludedIndex = 1; + + // Check that snapshot installation is not in progress + assertNull(follower.getSnapshotTracker()); + + // Make sure that we have more than 1 chunk to send + assertTrue(totalChunks > 1); + + // Send an install snapshot with the first chunk to start the process of installing a snapshot + byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize); + follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1, + chunkData, 1, totalChunks)); + + // Check if snapshot installation is in progress now + assertNotNull(follower.getSnapshotTracker()); + + // Send appendEntries with a new term and leader. + AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1, + Arrays.asList(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1); + + follower.handleMessage(leaderActor, appendEntries); + + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + assertEquals("isSuccess", true, reply.isSuccess()); + assertEquals("getLogLastIndex", 2, reply.getLogLastIndex()); + assertEquals("getLogLastTerm", 2, reply.getLogLastTerm()); + assertEquals("getTerm", 2, reply.getTerm()); + + assertNull(follower.getSnapshotTracker()); + } + + @Test + public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception { + logStart("testInitialSyncUpWithHandleInstallSnapshot"); + + MockRaftActorContext context = createActorContext(); + context.setCommitIndex(-1); + + follower = createBehavior(context); + + ByteString bsSnapshot = createSnapshot(); + int offset = 0; + int snapshotLength = bsSnapshot.size(); + int chunkSize = 50; + int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0); + int lastIncludedIndex = 1; + int chunkIndex = 1; + InstallSnapshot lastInstallSnapshot = null; + + for(int i = 0; i < totalChunks; i++) { + byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize); + lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1, + chunkData, chunkIndex, totalChunks); + follower.handleMessage(leaderActor, lastInstallSnapshot); + offset = offset + 50; + lastIncludedIndex++; + chunkIndex++; + } + + FollowerInitialSyncUpStatus syncStatus = + MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertFalse(syncStatus.isInitialSyncDone()); + + // Clear all the messages + followerActor.underlyingActor().clear(); + + context.setLastApplied(101); + context.setCommitIndex(101); + setLastLogEntry(context, 1, 101, + new MockRaftActorContext.MockPayload("")); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0); + follower.handleMessage(leaderActor, appendEntries); + + syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertTrue(syncStatus.isInitialSyncDone()); } @Test public void testHandleOutOfSequenceInstallSnapshot() throws Exception { - JavaTestKit javaTestKit = new JavaTestKit(getSystem()) { - { + logStart("testHandleOutOfSequenceInstallSnapshot"); - ActorRef leaderActor = getSystem().actorOf(Props.create( - MessageCollectorActor.class)); + MockRaftActorContext context = createActorContext(); - MockRaftActorContext context = createActorContext(getRef()); + follower = createBehavior(context); - Follower follower = (Follower) createBehavior(context); + ByteString bsSnapshot = createSnapshot(); - HashMap followerSnapshot = new HashMap<>(); - followerSnapshot.put("1", "A"); - followerSnapshot.put("2", "B"); - followerSnapshot.put("3", "C"); + InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1, + getNextChunk(bsSnapshot, 10, 50), 3, 3); + follower.handleMessage(leaderActor, installSnapshot); - ByteString bsSnapshot = toByteString(followerSnapshot); + InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, + InstallSnapshotReply.class); - final InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader-1", 3, 1, getNextChunk(bsSnapshot, 10), 3, 3); - follower.handleMessage(leaderActor, installSnapshot); + assertEquals("isSuccess", false, reply.isSuccess()); + assertEquals("getChunkIndex", -1, reply.getChunkIndex()); + assertEquals("getTerm", 1, reply.getTerm()); + assertEquals("getFollowerId", context.getId(), reply.getFollowerId()); + + assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker()); + } + + @Test + public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){ + MockRaftActorContext context = createActorContext(); + + Stopwatch stopwatch = Stopwatch.createStarted(); + + follower = createBehavior(context); + + TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class); + + long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); + + assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis()); + + RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow); + assertTrue("Expected Candidate", newBehavior instanceof Candidate); + } - Object messages = executeLocalOperation(leaderActor, "get-all-messages"); + @Test + public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled(){ + MockRaftActorContext context = createActorContext(); + context.setConfigParams(new DefaultConfigParamsImpl(){ + @Override + public FiniteDuration getElectionTimeOutInterval() { + return FiniteDuration.apply(100, TimeUnit.MILLISECONDS); + } + }); - assertNotNull(messages); - assertTrue(messages instanceof List); - List listMessages = (List) messages; + context.setRaftPolicy(createRaftPolicy(false, false)); - int installSnapshotReplyReceivedCount = 0; - for (Object message: listMessages) { - if (message instanceof InstallSnapshotReply) { - ++installSnapshotReplyReceivedCount; - } - } + follower = createBehavior(context); - assertEquals(1, installSnapshotReplyReceivedCount); - InstallSnapshotReply reply = (InstallSnapshotReply) listMessages.get(0); - assertEquals(false, reply.isSuccess()); - assertEquals(-1, reply.getChunkIndex()); - assertEquals(ByteString.EMPTY, follower.getSnapshotChunksCollected()); + TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class); + RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow); + assertSame("handleMessage result", follower, newBehavior); + } + @Test + public void testFollowerSchedulesElectionIfNonVoting(){ + MockRaftActorContext context = createActorContext(); + context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo(context.getId(), false)))); + ((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval( + FiniteDuration.apply(100, TimeUnit.MILLISECONDS)); + ((DefaultConfigParamsImpl)context.getConfigParams()).setElectionTimeoutFactor(1); + + follower = new Follower(context, "leader", (short)1); + + ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor, + ElectionTimeout.class); + RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout); + assertSame("handleMessage result", follower, newBehavior); + assertNull("Expected null leaderId", follower.getLeaderId()); + } - }}; + @Test + public void testElectionScheduledWhenAnyRaftRPCReceived(){ + MockRaftActorContext context = createActorContext(); + follower = createBehavior(context); + follower.handleMessage(leaderActor, new RaftRPC() { + private static final long serialVersionUID = 1L; + + @Override + public long getTerm() { + return 100; + } + }); + verify(follower).scheduleElection(any(FiniteDuration.class)); } - public Object executeLocalOperation(ActorRef actor, Object message) throws Exception { - return MessageCollectorActor.getAllMessages(actor); + @Test + public void testElectionNotScheduledWhenNonRaftRPCMessageReceived(){ + MockRaftActorContext context = createActorContext(); + follower = createBehavior(context); + follower.handleMessage(leaderActor, "non-raft-rpc"); + verify(follower, never()).scheduleElection(any(FiniteDuration.class)); } - public ByteString getNextChunk (ByteString bs, int offset){ + public byte[] getNextChunk (ByteString bs, int offset, int chunkSize){ int snapshotLength = bs.size(); int start = offset; - int size = 50; - if (50 > snapshotLength) { + int size = chunkSize; + if (chunkSize > snapshotLength) { size = snapshotLength; } else { - if ((start + 50) > snapshotLength) { + if (start + chunkSize > snapshotLength) { size = snapshotLength - start; } } - return bs.substring(start, start + size); - } - - private ByteString toByteString(Map state) { - ByteArrayOutputStream b = null; - ObjectOutputStream o = null; - try { - try { - b = new ByteArrayOutputStream(); - o = new ObjectOutputStream(b); - o.writeObject(state); - byte[] snapshotBytes = b.toByteArray(); - return ByteString.copyFrom(snapshotBytes); - } finally { - if (o != null) { - o.flush(); - o.close(); - } - if (b != null) { - b.close(); - } - } - } catch (IOException e) { - org.junit.Assert.fail("IOException in converting Hashmap to Bytestring:" + e); - } - return null; + + byte[] nextChunk = new byte[size]; + bs.copyTo(nextChunk, start, 0, size); + return nextChunk; + } + + private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess, + String expFollowerId, long expLogLastTerm, long expLogLastIndex) { + expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false); + } + + private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess, + String expFollowerId, long expLogLastTerm, long expLogLastIndex, + boolean expForceInstallSnapshot) { + + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, + AppendEntriesReply.class); + + assertEquals("isSuccess", expSuccess, reply.isSuccess()); + assertEquals("getTerm", expTerm, reply.getTerm()); + assertEquals("getFollowerId", expFollowerId, reply.getFollowerId()); + assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm()); + assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex()); + assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion()); + assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot()); + } + + + private static ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) { + return new MockRaftActorContext.MockReplicatedLogEntry(term, index, + new MockRaftActorContext.MockPayload(data)); + } + + private ByteString createSnapshot(){ + HashMap followerSnapshot = new HashMap<>(); + followerSnapshot.put("1", "A"); + followerSnapshot.put("2", "B"); + followerSnapshot.put("3", "C"); + + return toByteString(followerSnapshot); + } + + @Override + protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext, + ActorRef actorRef, RaftRPC rpc) throws Exception { + super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc); + + String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null; + assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor()); + } + + @Override + protected void handleAppendEntriesAddSameEntryToLogReply(final TestActorRef replyActor) + throws Exception { + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class); + assertEquals("isSuccess", true, reply.isSuccess()); } }