X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FFollowerTest.java;h=9d64b140fb7d9db2c191eb12ffc847deb5273561;hb=b8da9f6fa8bf4284805349f4638ebdadf169ff5f;hp=51818d15b6475b02b4b58a82e570f9252069baa9;hpb=3fda1a923defdbf18849c6080c3aa19f1ebf2c5f;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index 51818d15b6..9d64b140fb 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -12,18 +12,22 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; + import akka.actor.ActorRef; import akka.actor.Props; import akka.testkit.TestActorRef; import com.google.common.base.Stopwatch; +import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.concurrent.TimeUnit; @@ -38,6 +42,7 @@ import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; +import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; @@ -45,10 +50,13 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; +import org.opendaylight.controller.cluster.raft.persisted.ServerInfo; +import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import scala.concurrent.duration.FiniteDuration; -public class FollowerTest extends AbstractRaftActorBehaviorTest { +public class FollowerTest extends AbstractRaftActorBehaviorTest { private final TestActorRef followerActor = actorFactory.createTestActor( Props.create(MessageCollectorActor.class), actorFactory.generateActorId("follower")); @@ -56,14 +64,14 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { private final TestActorRef leaderActor = actorFactory.createTestActor( Props.create(MessageCollectorActor.class), actorFactory.generateActorId("leader")); - private RaftActorBehavior follower; + private Follower follower; private final short payloadVersion = 5; @Override @After public void tearDown() throws Exception { - if(follower != null) { + if (follower != null) { follower.close(); } @@ -71,8 +79,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Override - protected RaftActorBehavior createBehavior(RaftActorContext actorContext) { - return new TestFollower(actorContext); + protected Follower createBehavior(RaftActorContext actorContext) { + return spy(new Follower(actorContext)); } @Override @@ -81,44 +89,71 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Override - protected MockRaftActorContext createActorContext(ActorRef actorRef){ + protected MockRaftActorContext createActorContext(ActorRef actorRef) { MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef); context.setPayloadVersion(payloadVersion ); return context; } - private int getElectionTimeoutCount(RaftActorBehavior follower){ - if(follower instanceof TestFollower){ - return ((TestFollower) follower).getElectionTimeoutCount(); - } - return -1; - } - @Test - public void testThatAnElectionTimeoutIsTriggered(){ + public void testThatAnElectionTimeoutIsTriggered() { MockRaftActorContext actorContext = createActorContext(); follower = new Follower(actorContext); - MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class, + MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class, actorContext.getConfigParams().getElectionTimeOutInterval().$times(6).toMillis()); } @Test - public void testHandleElectionTimeout(){ - logStart("testHandleElectionTimeout"); + public void testHandleElectionTimeoutWhenNoLeaderMessageReceived() { + logStart("testHandleElectionTimeoutWhenNoLeaderMessageReceived"); - follower = new Follower(createActorContext()); + MockRaftActorContext context = createActorContext(); + follower = new Follower(context); - RaftActorBehavior raftBehavior = follower.handleMessage(followerActor, new ElectionTimeout()); + Uninterruptibles.sleepUninterruptibly(context.getConfigParams().getElectionTimeOutInterval().toMillis(), + TimeUnit.MILLISECONDS); + RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE); assertTrue(raftBehavior instanceof Candidate); } @Test - public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull(){ + public void testHandleElectionTimeoutWhenLeaderMessageReceived() { + logStart("testHandleElectionTimeoutWhenLeaderMessageReceived"); + + MockRaftActorContext context = createActorContext(); + ((DefaultConfigParamsImpl) context.getConfigParams()) + .setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS)); + ((DefaultConfigParamsImpl) context.getConfigParams()).setElectionTimeoutFactor(4); + + follower = new Follower(context); + context.setCurrentBehavior(follower); + + Uninterruptibles.sleepUninterruptibly(context.getConfigParams() + .getElectionTimeOutInterval().toMillis() - 100, TimeUnit.MILLISECONDS); + follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(), + -1, -1, (short) 1)); + + Uninterruptibles.sleepUninterruptibly(130, TimeUnit.MILLISECONDS); + RaftActorBehavior raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE); + assertTrue(raftBehavior instanceof Follower); + + Uninterruptibles.sleepUninterruptibly(context.getConfigParams() + .getElectionTimeOutInterval().toMillis() - 150, TimeUnit.MILLISECONDS); + follower.handleMessage(leaderActor, new AppendEntries(1, "leader", -1, -1, Collections.emptyList(), + -1, -1, (short) 1)); + + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + raftBehavior = follower.handleMessage(leaderActor, ElectionTimeout.INSTANCE); + assertTrue(raftBehavior instanceof Follower); + } + + @Test + public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull() { logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNull"); - RaftActorContext context = createActorContext(); + MockRaftActorContext context = createActorContext(); long term = 1000; context.getTermInformation().update(term, null); @@ -130,14 +165,14 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertEquals("isVoteGranted", true, reply.isVoteGranted()); assertEquals("getTerm", term, reply.getTerm()); - assertEquals("schedule election", 1, getElectionTimeoutCount(follower)); + verify(follower).scheduleElection(any(FiniteDuration.class)); } @Test - public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId(){ + public void testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId() { logStart("testHandleRequestVoteWhenSenderTermEqualToCurrentTermAndVotedForIsNotTheSameAsCandidateId"); - RaftActorContext context = createActorContext(); + MockRaftActorContext context = createActorContext(); long term = 1000; context.getTermInformation().update(term, "test"); @@ -148,7 +183,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class); assertEquals("isVoteGranted", false, reply.isVoteGranted()); - assertEquals("schedule election", 0, getElectionTimeoutCount(follower)); + verify(follower, never()).scheduleElection(any(FiniteDuration.class)); } @@ -172,7 +207,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower = createBehavior(context); follower.handleMessage(leaderActor, appendEntries); - FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, + FollowerInitialSyncUpStatus.class); AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); assertFalse(syncStatus.isInitialSyncDone()); @@ -194,7 +230,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower = createBehavior(context); follower.handleMessage(leaderActor, appendEntries); - FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, + FollowerInitialSyncUpStatus.class); AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); assertFalse(syncStatus.isInitialSyncDone()); @@ -202,7 +239,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Test - public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() throws Exception { + public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() + throws Exception { logStart("testHandleFirstAppendEntries"); MockRaftActorContext context = createActorContext(); @@ -219,7 +257,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower = createBehavior(context); follower.handleMessage(leaderActor, appendEntries); - FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, + FollowerInitialSyncUpStatus.class); AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); assertFalse(syncStatus.isInitialSyncDone()); @@ -227,7 +266,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Test - public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() throws Exception { + public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() + throws Exception { logStart("testHandleFirstAppendEntries"); MockRaftActorContext context = createActorContext(); @@ -243,7 +283,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower = createBehavior(context); follower.handleMessage(leaderActor, appendEntries); - FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, + FollowerInitialSyncUpStatus.class); AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); assertFalse(syncStatus.isInitialSyncDone()); @@ -251,8 +292,10 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Test - public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshotButCalculatedPreviousEntryMissing() throws Exception { - logStart("testHandleFirstAppendEntries"); + public void testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing() + throws Exception { + logStart( + "testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing"); MockRaftActorContext context = createActorContext(); context.getReplicatedLog().clear(0,2); @@ -267,7 +310,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower = createBehavior(context); follower.handleMessage(leaderActor, appendEntries); - FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, + FollowerInitialSyncUpStatus.class); AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); assertFalse(syncStatus.isInitialSyncDone()); @@ -289,7 +333,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower = createBehavior(context); follower.handleMessage(leaderActor, appendEntries); - FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, + FollowerInitialSyncUpStatus.class); assertFalse(syncStatus.isInitialSyncDone()); @@ -339,7 +384,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower = createBehavior(context); follower.handleMessage(leaderActor, appendEntries); - FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, + FollowerInitialSyncUpStatus.class); assertFalse(syncStatus.isInitialSyncDone()); @@ -380,7 +426,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower = createBehavior(context); follower.handleMessage(leaderActor, appendEntries); - FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, + FollowerInitialSyncUpStatus.class); assertFalse(syncStatus.isInitialSyncDone()); @@ -430,8 +477,6 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { * with a commitIndex that is greater than what has been applied to the * state machine of the RaftActor, the RaftActor applies the state and * sets it current applied state to the commitIndex of the sender. - * - * @throws Exception */ @Test public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception { @@ -460,8 +505,6 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { * This test verifies that when an AppendEntries is received a specific prevLogTerm * which does not match the term that is in RaftActors log entry at prevLogIndex * then the RaftActor does not change it's state and it returns a failure. - * - * @throws Exception */ @Test public void testHandleAppendEntriesSenderPrevLogTermNotSameAsReceiverPrevLogTerm() { @@ -492,9 +535,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { * This test verifies that when a new AppendEntries message is received with * new entries and the logs of the sender and receiver match that the new * entries get added to the log and the log is incremented by the number of - * entries received in appendEntries - * - * @throws Exception + * entries received in appendEntries. */ @Test public void testHandleAppendEntriesAddNewEntries() { @@ -546,7 +587,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { * This test verifies that when a new AppendEntries message is received with * new entries and the logs of the sender and receiver are out-of-sync that * the log is first corrected by removing the out of sync entries from the - * log and then adding in the new entries sent with the AppendEntries message + * log and then adding in the new entries sent with the AppendEntries message. */ @Test public void testHandleAppendEntriesCorrectReceiverLogEntries() { @@ -638,10 +679,10 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Test - public void testHandleAppendEntriesPreviousLogEntryMissing(){ + public void testHandleAppendEntriesPreviousLogEntryMissing() { logStart("testHandleAppendEntriesPreviousLogEntryMissing"); - MockRaftActorContext context = createActorContext(); + final MockRaftActorContext context = createActorContext(); // Prepare the receivers log MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog(); @@ -708,7 +749,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Test - public void testHandleAppendEntriesAfterInstallingSnapshot(){ + public void testHandleAppendEntriesAfterInstallingSnapshot() { logStart("testHandleAppendAfterInstallingSnapshot"); MockRaftActorContext context = createActorContext(); @@ -741,8 +782,6 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { /** * This test verifies that when InstallSnapshot is received by * the follower its applied correctly. - * - * @throws Exception */ @Test public void testHandleInstallSnapshot() throws Exception { @@ -757,13 +796,13 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { int offset = 0; int snapshotLength = bsSnapshot.size(); int chunkSize = 50; - int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0); + int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0); int lastIncludedIndex = 1; int chunkIndex = 1; InstallSnapshot lastInstallSnapshot = null; - for(int i = 0; i < totalChunks; i++) { - ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize); + for (int i = 0; i < totalChunks; i++) { + byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize); lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1, chunkData, chunkIndex, totalChunks); follower.handleMessage(leaderActor, lastInstallSnapshot); @@ -792,22 +831,20 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertEquals("InstallSnapshotReply count", totalChunks, replies.size()); chunkIndex = 1; - for(InstallSnapshotReply reply: replies) { + for (InstallSnapshotReply reply: replies) { assertEquals("getChunkIndex", chunkIndex++, reply.getChunkIndex()); assertEquals("getTerm", 1, reply.getTerm()); assertEquals("isSuccess", true, reply.isSuccess()); assertEquals("getFollowerId", context.getId(), reply.getFollowerId()); } - assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker()); + assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker()); } /** * Verify that when an AppendEntries is sent to a follower during a snapshot install * the Follower short-circuits the processing of the AppendEntries message. - * - * @throws Exception */ @Test public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception { @@ -820,37 +857,79 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { ByteString bsSnapshot = createSnapshot(); int snapshotLength = bsSnapshot.size(); int chunkSize = 50; - int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0); + int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0); int lastIncludedIndex = 1; // Check that snapshot installation is not in progress - assertNull(((Follower) follower).getSnapshotTracker()); + assertNull(follower.getSnapshotTracker()); // Make sure that we have more than 1 chunk to send assertTrue(totalChunks > 1); // Send an install snapshot with the first chunk to start the process of installing a snapshot - ByteString chunkData = getNextChunk(bsSnapshot, 0, chunkSize); + byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize); follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1, chunkData, 1, totalChunks)); // Check if snapshot installation is in progress now - assertNotNull(((Follower) follower).getSnapshotTracker()); + assertNotNull(follower.getSnapshotTracker()); // Send an append entry - AppendEntries appendEntries = mock(AppendEntries.class); - doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm(); + AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1, + Arrays.asList(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1); follower.handleMessage(leaderActor, appendEntries); AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); - assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex()); - assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm()); - assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm()); + assertEquals("isSuccess", true, reply.isSuccess()); + assertEquals("getLogLastIndex", context.getReplicatedLog().lastIndex(), reply.getLogLastIndex()); + assertEquals("getLogLastTerm", context.getReplicatedLog().lastTerm(), reply.getLogLastTerm()); + assertEquals("getTerm", context.getTermInformation().getCurrentTerm(), reply.getTerm()); + + assertNotNull(follower.getSnapshotTracker()); + } + + @Test + public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() throws Exception { + logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader"); + + MockRaftActorContext context = createActorContext(); - // We should not hit the code that needs to look at prevLogIndex because we are short circuiting - verify(appendEntries, never()).getPrevLogIndex(); + follower = createBehavior(context); + + ByteString bsSnapshot = createSnapshot(); + int snapshotLength = bsSnapshot.size(); + int chunkSize = 50; + int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0); + int lastIncludedIndex = 1; + + // Check that snapshot installation is not in progress + assertNull(follower.getSnapshotTracker()); + // Make sure that we have more than 1 chunk to send + assertTrue(totalChunks > 1); + + // Send an install snapshot with the first chunk to start the process of installing a snapshot + byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize); + follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1, + chunkData, 1, totalChunks)); + + // Check if snapshot installation is in progress now + assertNotNull(follower.getSnapshotTracker()); + + // Send appendEntries with a new term and leader. + AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1, + Arrays.asList(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1); + + follower.handleMessage(leaderActor, appendEntries); + + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + assertEquals("isSuccess", true, reply.isSuccess()); + assertEquals("getLogLastIndex", 2, reply.getLogLastIndex()); + assertEquals("getLogLastTerm", 2, reply.getLogLastTerm()); + assertEquals("getTerm", 2, reply.getTerm()); + + assertNull(follower.getSnapshotTracker()); } @Test @@ -858,6 +937,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { logStart("testInitialSyncUpWithHandleInstallSnapshot"); MockRaftActorContext context = createActorContext(); + context.setCommitIndex(-1); follower = createBehavior(context); @@ -865,13 +945,13 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { int offset = 0; int snapshotLength = bsSnapshot.size(); int chunkSize = 50; - int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0); + int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0); int lastIncludedIndex = 1; int chunkIndex = 1; InstallSnapshot lastInstallSnapshot = null; - for(int i = 0; i < totalChunks; i++) { - ByteString chunkData = getNextChunk(bsSnapshot, offset, chunkSize); + for (int i = 0; i < totalChunks; i++) { + byte[] chunkData = getNextChunk(bsSnapshot, offset, chunkSize); lastInstallSnapshot = new InstallSnapshot(1, "leader", lastIncludedIndex, 1, chunkData, chunkIndex, totalChunks); follower.handleMessage(leaderActor, lastInstallSnapshot); @@ -927,28 +1007,31 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertEquals("getTerm", 1, reply.getTerm()); assertEquals("getFollowerId", context.getId(), reply.getFollowerId()); - assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker()); + assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker()); } @Test - public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){ + public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers() { MockRaftActorContext context = createActorContext(); Stopwatch stopwatch = Stopwatch.createStarted(); follower = createBehavior(context); - MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class); + TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class); long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis()); + + RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow); + assertTrue("Expected Candidate", newBehavior instanceof Candidate); } @Test - public void testFollowerDoesNotScheduleAnElectionIfAutomaticElectionsAreDisabled(){ + public void testFollowerSchedulesElectionIfAutomaticElectionsAreDisabled() { MockRaftActorContext context = createActorContext(); - context.setConfigParams(new DefaultConfigParamsImpl(){ + context.setConfigParams(new DefaultConfigParamsImpl() { @Override public FiniteDuration getElectionTimeOutInterval() { return FiniteDuration.apply(100, TimeUnit.MILLISECONDS); @@ -959,42 +1042,66 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower = createBehavior(context); - MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 500); + TimeoutNow timeoutNow = MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class); + RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), timeoutNow); + assertSame("handleMessage result", follower, newBehavior); } @Test - public void testElectionScheduledWhenAnyRaftRPCReceived(){ + public void testFollowerSchedulesElectionIfNonVoting() { + MockRaftActorContext context = createActorContext(); + context.updatePeerIds(new ServerConfigurationPayload(Arrays.asList(new ServerInfo(context.getId(), false)))); + ((DefaultConfigParamsImpl)context.getConfigParams()).setHeartBeatInterval( + FiniteDuration.apply(100, TimeUnit.MILLISECONDS)); + ((DefaultConfigParamsImpl)context.getConfigParams()).setElectionTimeoutFactor(1); + + follower = new Follower(context, "leader", (short)1); + + ElectionTimeout electionTimeout = MessageCollectorActor.expectFirstMatching(followerActor, + ElectionTimeout.class); + RaftActorBehavior newBehavior = follower.handleMessage(ActorRef.noSender(), electionTimeout); + assertSame("handleMessage result", follower, newBehavior); + assertNull("Expected null leaderId", follower.getLeaderId()); + } + + @Test + public void testElectionScheduledWhenAnyRaftRPCReceived() { MockRaftActorContext context = createActorContext(); follower = createBehavior(context); follower.handleMessage(leaderActor, new RaftRPC() { + private static final long serialVersionUID = 1L; + @Override public long getTerm() { return 100; } }); - assertEquals("schedule election", 1, getElectionTimeoutCount(follower)); + verify(follower).scheduleElection(any(FiniteDuration.class)); } @Test - public void testElectionNotScheduledWhenNonRaftRPCMessageReceived(){ + public void testElectionNotScheduledWhenNonRaftRPCMessageReceived() { MockRaftActorContext context = createActorContext(); follower = createBehavior(context); follower.handleMessage(leaderActor, "non-raft-rpc"); - assertEquals("schedule election", 0, getElectionTimeoutCount(follower)); + verify(follower, never()).scheduleElection(any(FiniteDuration.class)); } - public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){ + public byte[] getNextChunk(ByteString bs, int offset, int chunkSize) { int snapshotLength = bs.size(); int start = offset; int size = chunkSize; if (chunkSize > snapshotLength) { size = snapshotLength; } else { - if ((start + chunkSize) > snapshotLength) { + if (start + chunkSize > snapshotLength) { size = snapshotLength - start; } } - return bs.substring(start, start + size); + + byte[] nextChunk = new byte[size]; + bs.copyTo(nextChunk, start, 0, size); + return nextChunk; } private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess, @@ -1019,12 +1126,12 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } - private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) { - return new MockRaftActorContext.MockReplicatedLogEntry(term, index, + private static ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) { + return new SimpleReplicatedLogEntry(index, term, new MockRaftActorContext.MockPayload(data)); } - private ByteString createSnapshot(){ + private ByteString createSnapshot() { HashMap followerSnapshot = new HashMap<>(); followerSnapshot.put("1", "A"); followerSnapshot.put("2", "B"); @@ -1034,37 +1141,18 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { } @Override - protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext, + protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext, ActorRef actorRef, RaftRPC rpc) throws Exception { super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc); - String expVotedFor = RequestVote.class.isInstance(rpc) ? ((RequestVote)rpc).getCandidateId() : null; + String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null; assertEquals("New votedFor", expVotedFor, actorContext.getTermInformation().getVotedFor()); } @Override - protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef replyActor) + protected void handleAppendEntriesAddSameEntryToLogReply(final TestActorRef replyActor) throws Exception { AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class); assertEquals("isSuccess", true, reply.isSuccess()); } - - private static class TestFollower extends Follower { - - int electionTimeoutCount = 0; - - public TestFollower(RaftActorContext context) { - super(context); - } - - @Override - protected void scheduleElection(FiniteDuration interval) { - electionTimeoutCount++; - super.scheduleElection(interval); - } - - public int getElectionTimeoutCount() { - return electionTimeoutCount; - } - } }