X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FFollowerTest.java;h=1b15ecb135a89f87c212ecf3a080cf8181ba921e;hb=224aa4f574c63576961dc9dc37e075e2e5096a5a;hp=29fb613327f23b72755514ce6f2c256d447f8a83;hpb=a8469dc0d7cacf28a09c24074bd48f380a2b8409;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index 29fb613327..1b15ecb135 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -1,20 +1,36 @@ +/* + * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + package org.opendaylight.controller.cluster.raft.behaviors; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import akka.actor.ActorRef; import akka.actor.Props; import akka.testkit.TestActorRef; +import com.google.common.base.Stopwatch; import com.google.protobuf.ByteString; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; +import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Assert; import org.junit.Test; +import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; import org.opendaylight.controller.cluster.raft.RaftActorContext; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; @@ -30,6 +46,7 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; +import scala.concurrent.duration.FiniteDuration; public class FollowerTest extends AbstractRaftActorBehaviorTest { @@ -41,6 +58,8 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { private RaftActorBehavior follower; + private final short payloadVersion = 5; + @Override @After public void tearDown() throws Exception { @@ -53,7 +72,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { @Override protected RaftActorBehavior createBehavior(RaftActorContext actorContext) { - return new Follower(actorContext); + return new TestFollower(actorContext); } @Override @@ -63,7 +82,16 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { @Override protected MockRaftActorContext createActorContext(ActorRef actorRef){ - return new MockRaftActorContext("follower", getSystem(), actorRef); + MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef); + context.setPayloadVersion(payloadVersion ); + return context; + } + + private static int getElectionTimeoutCount(RaftActorBehavior follower){ + if(follower instanceof TestFollower){ + return ((TestFollower) follower).getElectionTimeoutCount(); + } + return -1; } @Test @@ -102,6 +130,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertEquals("isVoteGranted", true, reply.isVoteGranted()); assertEquals("getTerm", term, reply.getTerm()); + assertEquals("schedule election", 1, getElectionTimeoutCount(follower)); } @Test @@ -119,6 +148,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { RequestVoteReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, RequestVoteReply.class); assertEquals("isVoteGranted", false, reply.isVoteGranted()); + assertEquals("schedule election", 0, getElectionTimeoutCount(follower)); } @@ -127,19 +157,121 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { logStart("testHandleFirstAppendEntries"); MockRaftActorContext context = createActorContext(); + context.getReplicatedLog().clear(0,2); + context.getReplicatedLog().append(newReplicatedLogEntry(1,100, "bar")); + context.getReplicatedLog().setSnapshotIndex(99); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + Assert.assertEquals(1, context.getReplicatedLog().size()); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + + assertFalse(syncStatus.isInitialSyncDone()); + assertTrue("append entries reply should be true", reply.isSuccess()); + } + + @Test + public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception { + logStart("testHandleFirstAppendEntries"); + + MockRaftActorContext context = createActorContext(); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + + assertFalse(syncStatus.isInitialSyncDone()); + assertFalse("append entries reply should be false", reply.isSuccess()); + } + + @Test + public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() throws Exception { + logStart("testHandleFirstAppendEntries"); + + MockRaftActorContext context = createActorContext(); + context.getReplicatedLog().clear(0,2); + context.getReplicatedLog().append(newReplicatedLogEntry(1, 100, "bar")); + context.getReplicatedLog().setSnapshotIndex(99); List entries = Arrays.asList( newReplicatedLogEntry(2, 101, "foo")); // The new commitIndex is 101 - AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100); + AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0); follower = createBehavior(context); follower.handleMessage(leaderActor, appendEntries); FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); assertFalse(syncStatus.isInitialSyncDone()); + assertTrue("append entries reply should be true", reply.isSuccess()); + } + + @Test + public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() throws Exception { + logStart("testHandleFirstAppendEntries"); + + MockRaftActorContext context = createActorContext(); + context.getReplicatedLog().clear(0,2); + context.getReplicatedLog().setSnapshotIndex(100); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 101, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 101, 100, (short) 0); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + + assertFalse(syncStatus.isInitialSyncDone()); + assertTrue("append entries reply should be true", reply.isSuccess()); + } + + @Test + public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshotButCalculatedPreviousEntryMissing() throws Exception { + logStart("testHandleFirstAppendEntries"); + + MockRaftActorContext context = createActorContext(); + context.getReplicatedLog().clear(0,2); + context.getReplicatedLog().setSnapshotIndex(100); + + List entries = Arrays.asList( + newReplicatedLogEntry(2, 105, "foo")); + + // The new commitIndex is 101 + AppendEntries appendEntries = new AppendEntries(2, "leader-1", -1, -1, entries, 105, 100, (short) 0); + + follower = createBehavior(context); + follower.handleMessage(leaderActor, appendEntries); + + FollowerInitialSyncUpStatus syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + + assertFalse(syncStatus.isInitialSyncDone()); + assertFalse("append entries reply should be false", reply.isSuccess()); } @Test @@ -152,7 +284,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { newReplicatedLogEntry(2, 101, "foo")); // The new commitIndex is 101 - AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100); + AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0); follower = createBehavior(context); follower.handleMessage(leaderActor, appendEntries); @@ -173,7 +305,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { newReplicatedLogEntry(2, 101, "foo")); // The new commitIndex is 101 - appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101); + appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0); follower.handleMessage(leaderActor, appendEntries); syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); @@ -202,7 +334,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { newReplicatedLogEntry(2, 101, "foo")); // The new commitIndex is 101 - AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100); + AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0); follower = createBehavior(context); follower.handleMessage(leaderActor, appendEntries); @@ -222,7 +354,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { newReplicatedLogEntry(2, 101, "foo")); // leader-2 is becoming the leader now and it says the commitIndex is 45 - appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100); + appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0); follower.handleMessage(leaderActor, appendEntries); syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); @@ -243,7 +375,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { newReplicatedLogEntry(2, 101, "foo")); // The new commitIndex is 101 - AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100); + AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0); follower = createBehavior(context); follower.handleMessage(leaderActor, appendEntries); @@ -264,7 +396,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { newReplicatedLogEntry(2, 101, "foo")); // The new commitIndex is 101 - appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101); + appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0); follower.handleMessage(leaderActor, appendEntries); syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); @@ -282,7 +414,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { newReplicatedLogEntry(2, 101, "foo")); // leader-2 is becoming the leader now and it says the commitIndex is 45 - appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100); + appendEntries = new AppendEntries(2, "leader-2", 45, 1, entries, 46, 100, (short)0); follower.handleMessage(leaderActor, appendEntries); syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); @@ -316,7 +448,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { newReplicatedLogEntry(2, 101, "foo")); // The new commitIndex is 101 - AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100); + AppendEntries appendEntries = new AppendEntries(2, "leader-1", 100, 1, entries, 101, 100, (short)0); follower = createBehavior(context); follower.handleMessage(leaderActor, appendEntries); @@ -342,7 +474,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { // AppendEntries is now sent with a bigger term // this will set the receivers term to be the same as the sender's term - AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1); + AppendEntries appendEntries = new AppendEntries(100, "leader", 0, 0, null, 101, -1, (short)0); follower = createBehavior(context); @@ -390,7 +522,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { // before the new behavior was created (1 in this case) // This will not work for a Candidate because as soon as a Candidate // is created it increments the term - AppendEntries appendEntries = new AppendEntries(1, "leader-1", 2, 1, entries, 4, -1); + short leaderPayloadVersion = 10; + String leaderId = "leader-1"; + AppendEntries appendEntries = new AppendEntries(1, leaderId, 2, 1, entries, 4, -1, leaderPayloadVersion); follower = createBehavior(context); @@ -402,6 +536,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertEquals("Entry 3", entries.get(0), log.get(3)); assertEquals("Entry 4", entries.get(1), log.get(4)); + assertEquals("getLeaderPayloadVersion", leaderPayloadVersion, newBehavior.getLeaderPayloadVersion()); + assertEquals("getLeaderId", leaderId, newBehavior.getLeaderId()); + expectAndVerifyAppendEntriesReply(1, true, context.getId(), 1, 4); } @@ -437,7 +574,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { // before the new behavior was created (1 in this case) // This will not work for a Candidate because as soon as a Candidate // is created it increments the term - AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1); + AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0); follower = createBehavior(context); @@ -462,6 +599,44 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3); } + @Test + public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() { + logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot"); + + MockRaftActorContext context = createActorContext(); + + // First set the receivers term to lower number + context.getTermInformation().update(1, "test"); + + // Prepare the receivers log + MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog(); + log.append(newReplicatedLogEntry(1, 0, "zero")); + log.append(newReplicatedLogEntry(1, 1, "one")); + log.append(newReplicatedLogEntry(1, 2, "two")); + + context.setReplicatedLog(log); + + // Prepare the entries to be sent with AppendEntries + List entries = new ArrayList<>(); + entries.add(newReplicatedLogEntry(2, 2, "two-1")); + entries.add(newReplicatedLogEntry(2, 3, "three")); + + // Send appendEntries with the same term as was set on the receiver + // before the new behavior was created (1 in this case) + // This will not work for a Candidate because as soon as a Candidate + // is created it increments the term + AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0); + + context.setRaftPolicy(createRaftPolicy(false, true)); + follower = createBehavior(context); + + RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries); + + Assert.assertSame(follower, newBehavior); + + expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true); + } + @Test public void testHandleAppendEntriesPreviousLogEntryMissing(){ logStart("testHandleAppendEntriesPreviousLogEntryMissing"); @@ -480,7 +655,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { List entries = new ArrayList<>(); entries.add(newReplicatedLogEntry(1, 4, "four")); - AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1); + AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, -1, (short)0); follower = createBehavior(context); @@ -511,7 +686,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower = createBehavior(context); - follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1)); + follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 1, -1, (short)0)); assertEquals("Next index", 2, log.last().getIndex() + 1); assertEquals("Entry 1", entries.get(0), log.get(1)); @@ -523,7 +698,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { entries = Arrays.asList(newReplicatedLogEntry(1, 1, "one"), newReplicatedLogEntry(1, 2, "two")); leaderActor.underlyingActor().clear(); - follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1)); + follower.handleMessage(leaderActor, new AppendEntries(1, "leader", 0, 1, entries, 2, -1, (short)0)); assertEquals("Next index", 3, log.last().getIndex() + 1); assertEquals("Entry 1", entries.get(0), log.get(1)); @@ -551,7 +726,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { List entries = new ArrayList<>(); entries.add(newReplicatedLogEntry(1, 4, "four")); - AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3); + AppendEntries appendEntries = new AppendEntries(1, "leader", 3, 1, entries, 4, 3, (short)0); follower = createBehavior(context); @@ -574,15 +749,11 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { logStart("testHandleInstallSnapshot"); MockRaftActorContext context = createActorContext(); + context.getTermInformation().update(1, "leader"); follower = createBehavior(context); - HashMap followerSnapshot = new HashMap<>(); - followerSnapshot.put("1", "A"); - followerSnapshot.put("2", "B"); - followerSnapshot.put("3", "C"); - - ByteString bsSnapshot = toByteString(followerSnapshot); + ByteString bsSnapshot = createSnapshot(); int offset = 0; int snapshotLength = bsSnapshot.size(); int chunkSize = 50; @@ -604,6 +775,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { ApplySnapshot applySnapshot = MessageCollectorActor.expectFirstMatching(followerActor, ApplySnapshot.class); Snapshot snapshot = applySnapshot.getSnapshot(); + assertNotNull(lastInstallSnapshot); assertEquals("getLastIndex", lastInstallSnapshot.getLastIncludedIndex(), snapshot.getLastIndex()); assertEquals("getLastIncludedTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastAppliedTerm()); @@ -611,6 +783,9 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { snapshot.getLastAppliedIndex()); assertEquals("getLastTerm", lastInstallSnapshot.getLastIncludedTerm(), snapshot.getLastTerm()); Assert.assertArrayEquals("getState", bsSnapshot.toByteArray(), snapshot.getState()); + assertEquals("getElectionTerm", 1, snapshot.getElectionTerm()); + assertEquals("getElectionVotedFor", "leader", snapshot.getElectionVotedFor()); + applySnapshot.getCallback().onSuccess(); List replies = MessageCollectorActor.getAllMatching( leaderActor, InstallSnapshotReply.class); @@ -627,6 +802,57 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker()); } + + /** + * Verify that when an AppendEntries is sent to a follower during a snapshot install + * the Follower short-circuits the processing of the AppendEntries message. + * + * @throws Exception + */ + @Test + public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception { + logStart("testReceivingAppendEntriesDuringInstallSnapshot"); + + MockRaftActorContext context = createActorContext(); + + follower = createBehavior(context); + + ByteString bsSnapshot = createSnapshot(); + int snapshotLength = bsSnapshot.size(); + int chunkSize = 50; + int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0); + int lastIncludedIndex = 1; + + // Check that snapshot installation is not in progress + assertNull(((Follower) follower).getSnapshotTracker()); + + // Make sure that we have more than 1 chunk to send + assertTrue(totalChunks > 1); + + // Send an install snapshot with the first chunk to start the process of installing a snapshot + ByteString chunkData = getNextChunk(bsSnapshot, 0, chunkSize); + follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1, + chunkData, 1, totalChunks)); + + // Check if snapshot installation is in progress now + assertNotNull(((Follower) follower).getSnapshotTracker()); + + // Send an append entry + AppendEntries appendEntries = mock(AppendEntries.class); + doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm(); + + follower.handleMessage(leaderActor, appendEntries); + + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex()); + assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm()); + assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm()); + + // We should not hit the code that needs to look at prevLogIndex because we are short circuiting + verify(appendEntries, never()).getPrevLogIndex(); + + } + @Test public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception { logStart("testInitialSyncUpWithHandleInstallSnapshot"); @@ -635,12 +861,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower = createBehavior(context); - HashMap followerSnapshot = new HashMap<>(); - followerSnapshot.put("1", "A"); - followerSnapshot.put("2", "B"); - followerSnapshot.put("3", "C"); - - ByteString bsSnapshot = toByteString(followerSnapshot); + ByteString bsSnapshot = createSnapshot(); int offset = 0; int snapshotLength = bsSnapshot.size(); int chunkSize = 50; @@ -676,7 +897,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { newReplicatedLogEntry(2, 101, "foo")); // The new commitIndex is 101 - AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101); + AppendEntries appendEntries = new AppendEntries(2, "leader", 101, 1, entries, 102, 101, (short)0); follower.handleMessage(leaderActor, appendEntries); syncStatus = MessageCollectorActor.expectFirstMatching(followerActor, FollowerInitialSyncUpStatus.class); @@ -692,12 +913,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { follower = createBehavior(context); - HashMap followerSnapshot = new HashMap<>(); - followerSnapshot.put("1", "A"); - followerSnapshot.put("2", "B"); - followerSnapshot.put("3", "C"); - - ByteString bsSnapshot = toByteString(followerSnapshot); + ByteString bsSnapshot = createSnapshot(); InstallSnapshot installSnapshot = new InstallSnapshot(1, "leader", 3, 1, getNextChunk(bsSnapshot, 10, 50), 3, 3); @@ -714,6 +930,59 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertNull("Expected null SnapshotTracker", ((Follower) follower).getSnapshotTracker()); } + @Test + public void testFollowerSchedulesElectionTimeoutImmediatelyWhenItHasNoPeers(){ + MockRaftActorContext context = createActorContext(); + + Stopwatch stopwatch = Stopwatch.createStarted(); + + follower = createBehavior(context); + + MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class); + + long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); + + assertTrue(elapsed < context.getConfigParams().getElectionTimeOutInterval().toMillis()); + } + + @Test + public void testFollowerDoesNotScheduleAnElectionIfAutomaticElectionsAreDisabled(){ + MockRaftActorContext context = createActorContext(); + context.setConfigParams(new DefaultConfigParamsImpl(){ + @Override + public FiniteDuration getElectionTimeOutInterval() { + return FiniteDuration.apply(100, TimeUnit.MILLISECONDS); + } + }); + + context.setRaftPolicy(createRaftPolicy(false, false)); + + follower = createBehavior(context); + + MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 500); + } + + @Test + public void testElectionScheduledWhenAnyRaftRPCReceived(){ + MockRaftActorContext context = createActorContext(); + follower = createBehavior(context); + follower.handleMessage(leaderActor, new RaftRPC() { + @Override + public long getTerm() { + return 100; + } + }); + assertEquals("schedule election", 1, getElectionTimeoutCount(follower)); + } + + @Test + public void testElectionNotScheduledWhenNonRaftRPCMessageReceived(){ + MockRaftActorContext context = createActorContext(); + follower = createBehavior(context); + follower.handleMessage(leaderActor, "non-raft-rpc"); + assertEquals("schedule election", 0, getElectionTimeoutCount(follower)); + } + public ByteString getNextChunk (ByteString bs, int offset, int chunkSize){ int snapshotLength = bs.size(); int start = offset; @@ -730,6 +999,12 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess, String expFollowerId, long expLogLastTerm, long expLogLastIndex) { + expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false); + } + + private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess, + String expFollowerId, long expLogLastTerm, long expLogLastIndex, + boolean expForceInstallSnapshot) { AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); @@ -739,13 +1014,25 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertEquals("getFollowerId", expFollowerId, reply.getFollowerId()); assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm()); assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex()); + assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion()); + assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot()); } - private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) { + + private static ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) { return new MockRaftActorContext.MockReplicatedLogEntry(term, index, new MockRaftActorContext.MockPayload(data)); } + private ByteString createSnapshot(){ + HashMap followerSnapshot = new HashMap<>(); + followerSnapshot.put("1", "A"); + followerSnapshot.put("2", "B"); + followerSnapshot.put("3", "C"); + + return toByteString(followerSnapshot); + } + @Override protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext, ActorRef actorRef, RaftRPC rpc) throws Exception { @@ -761,4 +1048,23 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class); assertEquals("isSuccess", true, reply.isSuccess()); } + + private static class TestFollower extends Follower { + + int electionTimeoutCount = 0; + + public TestFollower(RaftActorContext context) { + super(context); + } + + @Override + protected void scheduleElection(FiniteDuration interval) { + electionTimeoutCount++; + super.scheduleElection(interval); + } + + public int getElectionTimeoutCount() { + return electionTimeoutCount; + } + } }