X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FFollowerTest.java;h=29fb613327f23b72755514ce6f2c256d447f8a83;hp=a72a7c43324922748cad1837df047000cb1a4013;hb=6eb0469b729fbe69dad58ce6223fc231669089c7;hpb=eccb8a1b65c15fa25650f33f1723ef2e46b745d6 diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index a72a7c4332..29fb613327 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -1,162 +1,298 @@ package org.opendaylight.controller.cluster.raft.behaviors; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import akka.actor.ActorRef; import akka.actor.Props; -import akka.testkit.JavaTestKit; +import akka.testkit.TestActorRef; import com.google.protobuf.ByteString; -import junit.framework.Assert; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import org.junit.After; +import org.junit.Assert; import org.junit.Test; -import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; -import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; +import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; +import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; -import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectOutputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +public class FollowerTest extends AbstractRaftActorBehaviorTest { -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; + private final TestActorRef followerActor = actorFactory.createTestActor( + Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower")); -public class FollowerTest extends AbstractRaftActorBehaviorTest { + private final TestActorRef leaderActor = actorFactory.createTestActor( + Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader")); + + private RaftActorBehavior follower; - private final ActorRef followerActor = getSystem().actorOf(Props.create( - DoNothingActor.class)); + @Override + @After + public void tearDown() throws Exception { + if(follower != null) { + follower.close(); + } + super.tearDown(); + } - @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) { + @Override + protected RaftActorBehavior createBehavior(RaftActorContext actorContext) { return new Follower(actorContext); } - @Override protected RaftActorContext createActorContext() { + @Override + protected MockRaftActorContext createActorContext() { return createActorContext(followerActor); } - protected RaftActorContext createActorContext(ActorRef actorRef){ - return new MockRaftActorContext("test", getSystem(), actorRef); + @Override + protected MockRaftActorContext createActorContext(ActorRef actorRef){ + return new MockRaftActorContext("follower", getSystem(), actorRef); } @Test public void testThatAnElectionTimeoutIsTriggered(){ - new JavaTestKit(getSystem()) {{ - - new Within(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6)) { - protected void run() { - - Follower follower = new Follower(createActorContext(getTestActor())); - - final Boolean out = new ExpectMsg(DefaultConfigParamsImpl.HEART_BEAT_INTERVAL.$times(6), "ElectionTimeout") { - // do not put code outside this method, will run afterwards - protected Boolean match(Object in) { - if (in instanceof ElectionTimeout) { - return true; - } else { - throw noMatch(); - } - } - }.get(); - - assertEquals(true, out); - } - }; - }}; + MockRaftActorContext actorContext = createActorContext(); + follower = new Follower(actorContext); + + MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class, + actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis()); } @Test public void testHandleElectionTimeout(){ - RaftActorContext raftActorContext = createActorContext(); - Follower follower = - new Follower(raftActorContext); + logStart("testHandleElectionTimeout"); + + follower = new Follower(createActorContext()); - RaftState raftState = - follower.handleMessage(followerActor, new ElectionTimeout()); + RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, new ElectionTimeout()); - Assert.assertEquals(RaftState.Candidate, raftState); + assertTrue(raftBehavior instanceof Candidate); } @Test public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){ - new JavaTestKit(getSystem()) {{ + logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull"); + + RaftActorContext context = createActorContext(); + long term = 1000; + context.getTermInformation().update(term, null); - new Within(duration("1 seconds")) { - protected void run() { + follower = createBehavior(context); - RaftActorContext context = createActorContext(getTestActor()); + follower.handleMessage(leaderActor, new RequestVote(term, "test", 10000, 999)); + + RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class); + + assertEquals("isVoteGranted", true, reply.isVoteGranted()); + assertEquals("getTerm", term, reply.getTerm()); + } + + @Test + public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){ + logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId"); - context.getTermInformation().update(1000, null); + RaftActorContext context = createActorContext(); + long term = 1000; + context.getTermInformation().update(term, "test"); - RaftActorBehavior follower = createBehavior(context); + follower = createBehavior(context); - follower.handleMessage(getTestActor(), new RequestVote(1000, "test", 10000, 999)); + follower.handleMessage(leaderActor, new RequestVote(term, "candidate", 10000, 999)); - final Boolean out = new ExpectMsg(duration("1 seconds"), "RequestVoteReply") { - // do not put code outside this method, will run afterwards - protected Boolean match(Object in) { - if (in instanceof RequestVoteReply) { - RequestVoteReply reply = (RequestVoteReply) in; - return reply.isVoteGranted(); - } else { - throw noMatch(); - } - } - }.get(); + RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class); - assertEquals(true, out); - } - }; - }}; + assertEquals("isVoteGranted", false, reply.isVoteGranted()); } + @Test - public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){ - new JavaTestKit(getSystem()) {{ + public void testHandleFirstAppendEntries() throws Exception { + logStart("testHandleFirstAppendEntries"); + + MockRaftActorContext context = createActorContext(); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); - new Within(duration("1 seconds")) { - protected void run() { + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100); - RaftActorContext context = createActorContext(getTestActor()); + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); - context.getTermInformation().update(1000, "test"); + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); - RaftActorBehavior follower = createBehavior(context); + assertFalse(syncStatus.isInitialSyncDone()); + } + + @Test + public void testHandleSyncUpAppendEntries() throws Exception { + logStart("testHandleSyncUpAppendEntries"); - follower.handleMessage(getTestActor(), new RequestVote(1000, "candidate", 10000, 999)); + MockRaftActorContext context = createActorContext(); - final Boolean out = new ExpectMsg(duration("1 seconds"), "RequestVoteReply") { - // do not put code outside this method, will run afterwards - protected Boolean match(Object in) { - if (in instanceof RequestVoteReply) { - RequestVoteReply reply = (RequestVoteReply) in; - return reply.isVoteGranted(); - } else { - throw noMatch(); - } - } - }.get(); + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertFalse(syncStatus.isInitialSyncDone()); + + // Clear all the messages + followerActor.underlyingActor().clear(); + + context.setLastApplied(101); + context.setCommitIndex(101); + setLastLogEntry(context, 1, 101, + new MockRaftActorContext.MockPayload("")); + + entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101); + follower.handleMessage(leaderActor, appendEntries); + + syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertTrue(syncStatus.isInitialSyncDone()); + + followerActor.underlyingActor().clear(); + + // Sending the same message again should not generate another message + + follower.handleMessage(leaderActor, appendEntries); + + syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertNull(syncStatus); - assertEquals(false, out); - } - }; - }}; } + @Test + public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception { + logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete"); + + MockRaftActorContext context = createActorContext(); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertFalse(syncStatus.isInitialSyncDone()); + + // Clear all the messages + followerActor.underlyingActor().clear(); + + context.setLastApplied(100); + setLastLogEntry(context, 1, 100, + new MockRaftActorContext.MockPayload("")); + + entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // leader-2 is becoming the leader now and it says the commitIndex is 45 + appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100); + follower.handleMessage(leaderActor, appendEntries); + + syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + // We get a new message saying initial status is not done + assertFalse(syncStatus.isInitialSyncDone()); + + } + + + @Test + public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception { + logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete"); + + MockRaftActorContext context = createActorContext(); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertFalse(syncStatus.isInitialSyncDone()); + + // Clear all the messages + followerActor.underlyingActor().clear(); + + context.setLastApplied(101); + context.setCommitIndex(101); + setLastLogEntry(context, 1, 101, + new MockRaftActorContext.MockPayload("")); + + entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101); + follower.handleMessage(leaderActor, appendEntries); + + syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertTrue(syncStatus.isInitialSyncDone()); + + // Clear all the messages + followerActor.underlyingActor().clear(); + + context.setLastApplied(100); + setLastLogEntry(context, 1, 100, + new MockRaftActorContext.MockPayload("")); + + entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // leader-2 is becoming the leader now and it says the commitIndex is 45 + appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100); + follower.handleMessage(leaderActor, appendEntries); + + syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + // We get a new message saying initial status is not done + assertFalse(syncStatus.isInitialSyncDone()); + + } + + /** * This test verifies that when an AppendEntries RPC is received by a RaftActor * with a commitIndex that is greater than what has been applied to the @@ -167,32 +303,25 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { */ @Test public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception { - new JavaTestKit(getSystem()) {{ + logStart("testHandleAppendEntriesWithNewerCommitIndex"); - RaftActorContext context = - createActorContext(); + MockRaftActorContext context = createActorContext(); - context.setLastApplied(100); - setLastLogEntry((MockRaftActorContext) context, 1, 100, + context.setLastApplied(100); + setLastLogEntry(context, 1, 100, new MockRaftActorContext.MockPayload("")); - ((MockRaftActorContext) context).getReplicatedLog().setSnapshotIndex(99); + context.getReplicatedLog().setSnapshotIndex(99); - List entries = - Arrays.asList( - (ReplicatedLogEntry) new MockRaftActorContext.MockReplicatedLogEntry(2, 101, - new MockRaftActorContext.MockPayload("foo")) - ); + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); - // The new commitIndex is 101 - AppendEntries appendEntries = - new AppendEntries(2, "leader-1", 100, 1, entries, 101); + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100); - RaftState raftState = - createBehavior(context).handleMessage(getRef(), appendEntries); + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); - assertEquals(101L, context.getLastApplied()); - - }}; + assertEquals("getLastApplied", 101L, context.getLastApplied()); } /** @@ -203,58 +332,30 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { * @throws Exception */ @Test - public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() - throws Exception { - new JavaTestKit(getSystem()) {{ - - MockRaftActorContext context = (MockRaftActorContext) - createActorContext(); - - // First set the receivers term to lower number - context.getTermInformation().update(95, "test"); + public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() { + logStart("testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm"); - // Set the last log entry term for the receiver to be greater than - // what we will be sending as the prevLogTerm in AppendEntries - MockRaftActorContext.SimpleReplicatedLog mockReplicatedLog = - setLastLogEntry(context, 20, 0, new MockRaftActorContext.MockPayload("")); + MockRaftActorContext context = createActorContext(); - // AppendEntries is now sent with a bigger term - // this will set the receivers term to be the same as the sender's term - AppendEntries appendEntries = - new AppendEntries(100, "leader-1", 0, 0, null, 101); + // First set the receivers term to lower number + context.getTermInformation().update(95, "test"); - RaftActorBehavior behavior = createBehavior(context); + // AppendEntries is now sent with a bigger term + // this will set the receivers term to be the same as the sender's term + AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1); - // Send an unknown message so that the state of the RaftActor remains unchanged - RaftState expected = behavior.handleMessage(getRef(), "unknown"); + follower = createBehavior(context); - RaftState raftState = - behavior.handleMessage(getRef(), appendEntries); + RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries); - assertEquals(expected, raftState); + Assert.assertSame(follower, newBehavior); - // Also expect an AppendEntriesReply to be sent where success is false - final Boolean out = new ExpectMsg(duration("1 seconds"), - "AppendEntriesReply") { - // do not put code outside this method, will run afterwards - protected Boolean match(Object in) { - if (in instanceof AppendEntriesReply) { - AppendEntriesReply reply = (AppendEntriesReply) in; - return reply.isSuccess(); - } else { - throw noMatch(); - } - } - }.get(); + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, + AppendEntriesReply.class); - assertEquals(false, out); - - - }}; + assertEquals("isSuccess", false, reply.isSuccess()); } - - /** * This test verifies that when a new AppendEntries message is received with * new entries and the logs of the sender and receiver match that the new @@ -264,165 +365,201 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { * @throws Exception */ @Test - public void testHandleAppendEntriesAddNewEntries() throws Exception { - new JavaTestKit(getSystem()) {{ - - MockRaftActorContext context = (MockRaftActorContext) - createActorContext(); - - // First set the receivers term to lower number - context.getTermInformation().update(1, "test"); - - // Prepare the receivers log - MockRaftActorContext.SimpleReplicatedLog log = - new MockRaftActorContext.SimpleReplicatedLog(); - log.append( - new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero"))); - log.append( - new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one"))); - log.append( - new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two"))); - - context.setReplicatedLog(log); - - // Prepare the entries to be sent with AppendEntries - List entries = new ArrayList<>(); - entries.add( - new MockRaftActorContext.MockReplicatedLogEntry(1, 3, new MockRaftActorContext.MockPayload("three"))); - entries.add( - new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("four"))); - - // Send appendEntries with the same term as was set on the receiver - // before the new behavior was created (1 in this case) - // This will not work for a Candidate because as soon as a Candidate - // is created it increments the term - AppendEntries appendEntries = - new AppendEntries(1, "leader-1", 2, 1, entries, 4); - - RaftActorBehavior behavior = createBehavior(context); - - // Send an unknown message so that the state of the RaftActor remains unchanged - RaftState expected = behavior.handleMessage(getRef(), "unknown"); - - RaftState raftState = - behavior.handleMessage(getRef(), appendEntries); - - assertEquals(expected, raftState); - assertEquals(5, log.last().getIndex() + 1); - assertNotNull(log.get(3)); - assertNotNull(log.get(4)); - - // Also expect an AppendEntriesReply to be sent where success is false - final Boolean out = new ExpectMsg(duration("1 seconds"), - "AppendEntriesReply") { - // do not put code outside this method, will run afterwards - protected Boolean match(Object in) { - if (in instanceof AppendEntriesReply) { - AppendEntriesReply reply = (AppendEntriesReply) in; - return reply.isSuccess(); - } else { - throw noMatch(); - } - } - }.get(); - - assertEquals(true, out); - - - }}; - } + public void testHandleAppendEntriesAddNewEntries() { + logStart("testHandleAppendEntriesAddNewEntries"); + + MockRaftActorContext context = createActorContext(); + // First set the receivers term to lower number + context.getTermInformation().update(1, "test"); + // Prepare the receivers log + MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog(); + log.append(newReplicatedLogEntry(1, 0, "zero")); + log.append(newReplicatedLogEntry(1, 1, "one")); + log.append(newReplicatedLogEntry(1, 2, "two")); + + context.setReplicatedLog(log); + + // Prepare the entries to be sent with AppendEntries + List entries = new ArrayList<>(); + entries.add(newReplicatedLogEntry(1, 3, "three")); + entries.add(newReplicatedLogEntry(1, 4, "four")); + + // Send appendEntries with the same term as was set on the receiver + // before the new behavior was created (1 in this case) + // This will not work for a Candidate because as soon as a Candidate + // is created it increments the term + AppendEntries appendEntries = new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1); + + follower = createBehavior(context); + + RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries); + + Assert.assertSame(follower, newBehavior); + + assertEquals("Next index", 5, log.last().getIndex() + 1); + assertEquals("Entry 3", entries.get(0), log.get(3)); + assertEquals("Entry 4", entries.get(1), log.get(4)); + + expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4); + } /** * This test verifies that when a new AppendEntries message is received with * new entries and the logs of the sender and receiver are out-of-sync that * the log is first corrected by removing the out of sync entries from the * log and then adding in the new entries sent with the AppendEntries message - * - * @throws Exception */ @Test - public void testHandleAppendEntriesCorrectReceiverLogEntries() - throws Exception { - new JavaTestKit(getSystem()) {{ - - MockRaftActorContext context = (MockRaftActorContext) - createActorContext(); - - // First set the receivers term to lower number - context.getTermInformation().update(2, "test"); - - // Prepare the receivers log - MockRaftActorContext.SimpleReplicatedLog log = - new MockRaftActorContext.SimpleReplicatedLog(); - log.append( - new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero"))); - log.append( - new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one"))); - log.append( - new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two"))); - - context.setReplicatedLog(log); - - // Prepare the entries to be sent with AppendEntries - List entries = new ArrayList<>(); - entries.add( - new MockRaftActorContext.MockReplicatedLogEntry(2, 2, new MockRaftActorContext.MockPayload("two-1"))); - entries.add( - new MockRaftActorContext.MockReplicatedLogEntry(2, 3, new MockRaftActorContext.MockPayload("three"))); - - // Send appendEntries with the same term as was set on the receiver - // before the new behavior was created (1 in this case) - // This will not work for a Candidate because as soon as a Candidate - // is created it increments the term - AppendEntries appendEntries = - new AppendEntries(2, "leader-1", 1, 1, entries, 3); - - RaftActorBehavior behavior = createBehavior(context); - - // Send an unknown message so that the state of the RaftActor remains unchanged - RaftState expected = behavior.handleMessage(getRef(), "unknown"); - - RaftState raftState = - behavior.handleMessage(getRef(), appendEntries); - - assertEquals(expected, raftState); - - // The entry at index 2 will be found out-of-sync with the leader - // and will be removed - // Then the two new entries will be added to the log - // Thus making the log to have 4 entries - assertEquals(4, log.last().getIndex() + 1); - assertNotNull(log.get(2)); - - assertEquals("one", log.get(1).getData().toString()); - - // Check that the entry at index 2 has the new data - assertEquals("two-1", log.get(2).getData().toString()); - - assertEquals("three", log.get(3).getData().toString()); - - assertNotNull(log.get(3)); - - // Also expect an AppendEntriesReply to be sent where success is false - final Boolean out = new ExpectMsg(duration("1 seconds"), - "AppendEntriesReply") { - // do not put code outside this method, will run afterwards - protected Boolean match(Object in) { - if (in instanceof AppendEntriesReply) { - AppendEntriesReply reply = (AppendEntriesReply) in; - return reply.isSuccess(); - } else { - throw noMatch(); - } - } - }.get(); - - assertEquals(true, out); - - - }}; + public void testHandleAppendEntriesCorrectReceiverLogEntries() { + logStart("testHandleAppendEntriesCorrectReceiverLogEntries"); + + MockRaftActorContext context = createActorContext(); + + // First set the receivers term to lower number + context.getTermInformation().update(1, "test"); + + // Prepare the receivers log + MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog(); + log.append(newReplicatedLogEntry(1, 0, "zero")); + log.append(newReplicatedLogEntry(1, 1, "one")); + log.append(newReplicatedLogEntry(1, 2, "two")); + + context.setReplicatedLog(log); + + // Prepare the entries to be sent with AppendEntries + List entries = new ArrayList<>(); + entries.add(newReplicatedLogEntry(2, 2, "two-1")); + entries.add(newReplicatedLogEntry(2, 3, "three")); + + // Send appendEntries with the same term as was set on the receiver + // before the new behavior was created (1 in this case) + // This will not work for a Candidate because as soon as a Candidate + // is created it increments the term + AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1); + + follower = createBehavior(context); + + RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries); + + Assert.assertSame(follower, newBehavior); + + // The entry at index 2 will be found out-of-sync with the leader + // and will be removed + // Then the two new entries will be added to the log + // Thus making the log to have 4 entries + assertEquals("Next index", 4, log.last().getIndex() + 1); + //assertEquals("Entry 2", entries.get(0), log.get(2)); + + assertEquals("Entry 1 data", "one", log.get(1).getData().toString()); + + // Check that the entry at index 2 has the new data + assertEquals("Entry 2", entries.get(0), log.get(2)); + + assertEquals("Entry 3", entries.get(1), log.get(3)); + + expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3); + } + + @Test + public void testHandleAppendEntriesPreviousLogEntryMissing(){ + logStart("testHandleAppendEntriesPreviousLogEntryMissing"); + + MockRaftActorContext context = createActorContext(); + + // Prepare the receivers log + MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog(); + log.append(newReplicatedLogEntry(1, 0, "zero")); + log.append(newReplicatedLogEntry(1, 1, "one")); + log.append(newReplicatedLogEntry(1, 2, "two")); + + context.setReplicatedLog(log); + + // Prepare the entries to be sent with AppendEntries + List entries = new ArrayList<>(); + entries.add(newReplicatedLogEntry(1, 4, "four")); + + AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1); + + follower = createBehavior(context); + + RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries); + + Assert.assertSame(follower, newBehavior); + + expectAndVerifyAppendEntriesReply(1, false, context.getId(), 1, 2); + } + + @Test + public void testHandleAppendEntriesWithExistingLogEntry() { + logStart("testHandleAppendEntriesWithExistingLogEntry"); + + MockRaftActorContext context = createActorContext(); + + context.getTermInformation().update(1, "test"); + + // Prepare the receivers log + MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog(); + log.append(newReplicatedLogEntry(1, 0, "zero")); + log.append(newReplicatedLogEntry(1, 1, "one")); + + context.setReplicatedLog(log); + + // Send the last entry again. + List entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one")); + + follower = createBehavior(context); + + follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1)); + + assertEquals("Next index", 2, log.last().getIndex() + 1); + assertEquals("Entry 1", entries.get(0), log.get(1)); + + expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 1); + + // Send the last entry again and also a new one. + + entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two")); + + leaderActor.underlyingActor().clear(); + follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1)); + + assertEquals("Next index", 3, log.last().getIndex() + 1); + assertEquals("Entry 1", entries.get(0), log.get(1)); + assertEquals("Entry 2", entries.get(1), log.get(2)); + + expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 2); + } + + @Test + public void testHandleAppendEntriesAfterInstallingSnapshot(){ + logStart("testHandleAppendAfterInstallingSnapshot"); + + MockRaftActorContext context = createActorContext(); + + // Prepare the receivers log + MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog(); + + // Set up a log as if it has been snapshotted + log.setSnapshotIndex(3); + log.setSnapshotTerm(1); + + context.setReplicatedLog(log); + + // Prepare the entries to be sent with AppendEntries + List entries = new ArrayList<>(); + entries.add(newReplicatedLogEntry(1, 4, "four")); + + AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3); + + follower = createBehavior(context); + + RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries); + + Assert.assertSame(follower, newBehavior); + + expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4); } @@ -434,131 +571,194 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { */ @Test public void testHandleInstallSnapshot() throws Exception { - JavaTestKit javaTestKit = new JavaTestKit(getSystem()) {{ - - ActorRef leaderActor = getSystem().actorOf(Props.create( - MessageCollectorActor.class)); - - MockRaftActorContext context = (MockRaftActorContext) - createActorContext(getRef()); - - Follower follower = (Follower)createBehavior(context); - - HashMap followerSnapshot = new HashMap<>(); - followerSnapshot.put("1", "A"); - followerSnapshot.put("2", "B"); - followerSnapshot.put("3", "C"); - - ByteString bsSnapshot = toByteString(followerSnapshot); - ByteString chunkData = ByteString.EMPTY; - int offset = 0; - int snapshotLength = bsSnapshot.size(); - int i = 1; - - do { - chunkData = getNextChunk(bsSnapshot, offset); - final InstallSnapshot installSnapshot = - new InstallSnapshot(1, "leader-1", i, 1, - chunkData, i, 3); - follower.handleMessage(leaderActor, installSnapshot); - offset = offset + 50; - i++; - } while ((offset+50) < snapshotLength); - - final InstallSnapshot installSnapshot3 = new InstallSnapshot(1, "leader-1", 3, 1, chunkData, 3, 3); - follower.handleMessage(leaderActor, installSnapshot3); - - String[] matches = new ReceiveWhile(String.class, duration("2 seconds")) { - @Override - protected String match(Object o) throws Exception { - if (o instanceof ApplySnapshot) { - ApplySnapshot as = (ApplySnapshot)o; - if (as.getSnapshot().getLastIndex() != installSnapshot3.getLastIncludedIndex()) { - return "applySnapshot-lastIndex-mismatch"; - } - if (as.getSnapshot().getLastAppliedTerm() != installSnapshot3.getLastIncludedTerm()) { - return "applySnapshot-lastAppliedTerm-mismatch"; - } - if (as.getSnapshot().getLastAppliedIndex() != installSnapshot3.getLastIncludedIndex()) { - return "applySnapshot-lastAppliedIndex-mismatch"; - } - if (as.getSnapshot().getLastTerm() != installSnapshot3.getLastIncludedTerm()) { - return "applySnapshot-lastTerm-mismatch"; - } - return "applySnapshot"; - } - - return "ignoreCase"; - } - }.get(); - - String applySnapshotMatch = ""; - for (String reply: matches) { - if (reply.startsWith("applySnapshot")) { - applySnapshotMatch = reply; - } - } + logStart("testHandleInstallSnapshot"); + + MockRaftActorContext context = createActorContext(); + + follower = createBehavior(context); + + HashMap followerSnapshot = new HashMap<>(); + followerSnapshot.put("1", "A"); + followerSnapshot.put("2", "B"); + followerSnapshot.put("3", "C"); + + ByteString bsSnapshot = toByteString(followerSnapshot); + int offset = 0; + int snapshotLength = bsSnapshot.size(); + int chunkSize = 50; + int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0); + int lastIncludedIndex = 1; + int chunkIndex = 1; + InstallSnapshot lastInstallSnapshot = null; + + for(int i = 0; i < totalChunks; i++) { + ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize); + lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1, + chunkData, chunkIndex, totalChunks); + follower.handleMessage(leaderActor, lastInstallSnapshot); + offset = offset + 50; + lastIncludedIndex++; + chunkIndex++; + } - assertEquals("applySnapshot", applySnapshotMatch); + ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor, + ApplySnapshot.class); + Snapshot snapshot = applySnapshot.getSnapshot(); + assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex()); + assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(), + snapshot.getLastAppliedTerm()); + assertEquals("getLastAppliedIndex", lastInstallSnapshot.getLastIncludedIndex(), + snapshot.getLastAppliedIndex()); + assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm()); + Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState()); + + List replies = MessageCollectorActor.getAllMatching( + leaderActor, InstallSnapshotReply.class); + assertEquals("InstallSnapshotReply count", totalChunks, replies.size()); + + chunkIndex = 1; + for(InstallSnapshotReply reply: replies) { + assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex()); + assertEquals("getTerm", 1, reply.getTerm()); + assertEquals("isSuccess", true, reply.isSuccess()); + assertEquals("getFollowerId", context.getId(), reply.getFollowerId()); + } + + assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker()); + } - Object messages = executeLocalOperation(leaderActor, "get-all-messages"); + @Test + public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception { + logStart("testInitialSyncUpWithHandleInstallSnapshot"); + + MockRaftActorContext context = createActorContext(); + + follower = createBehavior(context); + + HashMap followerSnapshot = new HashMap<>(); + followerSnapshot.put("1", "A"); + followerSnapshot.put("2", "B"); + followerSnapshot.put("3", "C"); + + ByteString bsSnapshot = toByteString(followerSnapshot); + int offset = 0; + int snapshotLength = bsSnapshot.size(); + int chunkSize = 50; + int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0); + int lastIncludedIndex = 1; + int chunkIndex = 1; + InstallSnapshot lastInstallSnapshot = null; + + for(int i = 0; i < totalChunks; i++) { + ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize); + lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1, + chunkData, chunkIndex, totalChunks); + follower.handleMessage(leaderActor, lastInstallSnapshot); + offset = offset + 50; + lastIncludedIndex++; + chunkIndex++; + } - assertNotNull(messages); - assertTrue(messages instanceof List); - List listMessages = (List) messages; + FollowerInitialSyncUpStatus syncStatus = + MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); - int installSnapshotReplyReceivedCount = 0; - for (Object message: listMessages) { - if (message instanceof InstallSnapshotReply) { - ++installSnapshotReplyReceivedCount; - } - } + assertFalse(syncStatus.isInitialSyncDone()); - assertEquals(3, installSnapshotReplyReceivedCount); + // Clear all the messages + followerActor.underlyingActor().clear(); - }}; + context.setLastApplied(101); + context.setCommitIndex(101); + setLastLogEntry(context, 1, 101, + new MockRaftActorContext.MockPayload("")); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101); + follower.handleMessage(leaderActor, appendEntries); + + syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + + assertTrue(syncStatus.isInitialSyncDone()); } - public Object executeLocalOperation(ActorRef actor, Object message) throws Exception { - return MessageCollectorActor.getAllMessages(actor); + @Test + public void testHandleOutOfSequenceInstallSnapshot() throws Exception { + logStart("testHandleOutOfSequenceInstallSnapshot"); + + MockRaftActorContext context = createActorContext(); + + follower = createBehavior(context); + + HashMap followerSnapshot = new HashMap<>(); + followerSnapshot.put("1", "A"); + followerSnapshot.put("2", "B"); + followerSnapshot.put("3", "C"); + + ByteString bsSnapshot = toByteString(followerSnapshot); + + InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1, + getNextChunk(bsSnapshot, 10, 50), 3, 3); + follower.handleMessage(leaderActor, installSnapshot); + + InstallSnapshotReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, + InstallSnapshotReply.class); + + assertEquals("isSuccess", false, reply.isSuccess()); + assertEquals("getChunkIndex", -1, reply.getChunkIndex()); + assertEquals("getTerm", 1, reply.getTerm()); + assertEquals("getFollowerId", context.getId(), reply.getFollowerId()); + + assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker()); } - public ByteString getNextChunk (ByteString bs, int offset){ + public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){ int snapshotLength = bs.size(); int start = offset; - int size = 50; - if (50 > snapshotLength) { + int size = chunkSize; + if (chunkSize > snapshotLength) { size = snapshotLength; } else { - if ((start + 50) > snapshotLength) { + if ((start + chunkSize) > snapshotLength) { size = snapshotLength - start; } } return bs.substring(start, start + size); } - private ByteString toByteString(Map state) { - ByteArrayOutputStream b = null; - ObjectOutputStream o = null; - try { - try { - b = new ByteArrayOutputStream(); - o = new ObjectOutputStream(b); - o.writeObject(state); - byte[] snapshotBytes = b.toByteArray(); - return ByteString.copyFrom(snapshotBytes); - } finally { - if (o != null) { - o.flush(); - o.close(); - } - if (b != null) { - b.close(); - } - } - } catch (IOException e) { - org.junit.Assert.fail("IOException in converting Hashmap to Bytestring:" + e); - } - return null; + private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess, + String expFollowerId, long expLogLastTerm, long expLogLastIndex) { + + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, + AppendEntriesReply.class); + + assertEquals("isSuccess", expSuccess, reply.isSuccess()); + assertEquals("getTerm", expTerm, reply.getTerm()); + assertEquals("getFollowerId", expFollowerId, reply.getFollowerId()); + assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm()); + assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex()); + } + + private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) { + return new MockRaftActorContext.MockReplicatedLogEntry(term, index, + new MockRaftActorContext.MockPayload(data)); + } + + @Override + protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext, + ActorRef actorRef, RaftRPC rpc) throws Exception { + super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc); + + String expVotedFor = RequestVote.class.isInstance(rpc) ? ((RequestVote)rpc).getCandidateId() : null; + assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor()); + } + + @Override + protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef replyActor) + throws Exception { + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class); + assertEquals("isSuccess", true, reply.isSuccess()); } }