X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeaderTest.java;h=c3d33e12ec41ae744079bf864a4ab7d769f632f8;hp=b0d220a0d6fbd31db20992b2abd011bb20196d86;hb=8049fd4d06da0f4616180e46fbbe95f98cf698ea;hpb=23049d1bb77b0eb8d115ad7c485c09a2367800df diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index b0d220a0d6..c3d33e12ec 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -9,28 +9,37 @@ package org.opendaylight.controller.cluster.raft.behaviors; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; + import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Terminated; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; +import com.google.common.io.ByteSource; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; +import java.io.IOException; +import java.io.OutputStream; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.After; -import org.junit.Assert; import org.junit.Test; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; @@ -40,31 +49,33 @@ import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohor import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.RaftVersions; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; -import org.opendaylight.controller.cluster.raft.SerializationUtils; -import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.VotingState; -import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; -import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.FollowerToSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; +import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; +import org.opendaylight.controller.cluster.raft.persisted.ByteState; +import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy; import org.opendaylight.controller.cluster.raft.policy.RaftPolicy; import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; +import org.opendaylight.yangtools.concepts.Identifier; import scala.concurrent.duration.FiniteDuration; -public class LeaderTest extends AbstractLeaderTest { +public class LeaderTest extends AbstractLeaderTest { static final String FOLLOWER_ID = "follower"; public static final String LEADER_ID = "leader"; @@ -81,7 +92,7 @@ public class LeaderTest extends AbstractLeaderTest { @Override @After public void tearDown() throws Exception { - if(leader != null) { + if (leader != null) { leader.close(); } @@ -94,10 +105,8 @@ public class LeaderTest extends AbstractLeaderTest { leader = new Leader(createActorContext()); - // handle message should return the Leader state when it receives an - // unknown message - RaftActorBehavior behavior = leader.handleMessage(followerActor, "foo"); - Assert.assertTrue(behavior instanceof Leader); + // handle message should null when it receives an unknown message + assertNull(leader.handleMessage(followerActor, "foo")); } @Test @@ -105,16 +114,17 @@ public class LeaderTest extends AbstractLeaderTest { logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers"); MockRaftActorContext actorContext = createActorContextWithFollower(); - short payloadVersion = (short)5; + actorContext.setCommitIndex(-1); actorContext.setPayloadVersion(payloadVersion); long term = 1; actorContext.getTermInformation().update(term, ""); leader = new Leader(actorContext); + actorContext.setCurrentBehavior(leader); // Leader should send an immediate heartbeat with no entries as follower is inactive. - long lastIndex = actorContext.getReplicatedLog().lastIndex(); + final long lastIndex = actorContext.getReplicatedLog().lastIndex(); AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); assertEquals("getTerm", term, appendEntries.getTerm()); assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); @@ -130,8 +140,8 @@ public class LeaderTest extends AbstractLeaderTest { followerActor.underlyingActor().clear(); // Sleep for the heartbeat interval so AppendEntries is sent. - Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams(). - getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); + Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams() + .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); @@ -145,16 +155,15 @@ public class LeaderTest extends AbstractLeaderTest { } - private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index){ + private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index) { return sendReplicate(actorContext, 1, index); } - private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index){ + private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) { MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo"); - MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry( - term, index, payload); + SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload); actorContext.getReplicatedLog().append(newEntry); - return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry)); + return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true)); } @Test @@ -199,6 +208,8 @@ public class LeaderTest extends AbstractLeaderTest { logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry"); MockRaftActorContext actorContext = createActorContextWithFollower(); + actorContext.setCommitIndex(-1); + actorContext.setLastApplied(-1); // The raft context is initialized with a couple log entries. However the commitIndex // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't @@ -208,6 +219,7 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(newTerm, ""); leader = new Leader(actorContext); + actorContext.setCurrentBehavior(leader); // Leader will send an immediate heartbeat - ignore it. MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -281,7 +293,7 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex()); assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm()); assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString()); - assertEquals("Commit Index", lastIndex+1, actorContext.getCommitIndex()); + assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex()); } @Test @@ -312,8 +324,8 @@ public class LeaderTest extends AbstractLeaderTest { followerActor.underlyingActor().clear(); - for(int i=0;i<5;i++) { - sendReplicate(actorContext, lastIndex+i+1); + for (int i = 0; i < 5; i++) { + sendReplicate(actorContext, lastIndex + i + 1); } List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); @@ -352,14 +364,14 @@ public class LeaderTest extends AbstractLeaderTest { followerActor.underlyingActor().clear(); - for(int i=0;i<3;i++) { - sendReplicate(actorContext, lastIndex+i+1); + for (int i = 0; i < 3; i++) { + sendReplicate(actorContext, lastIndex + i + 1); leader.handleMessage(followerActor, new AppendEntriesReply( FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0)); } - for(int i=3;i<5;i++) { + for (int i = 3; i < 5; i++) { sendReplicate(actorContext, lastIndex + i + 1); } @@ -368,9 +380,9 @@ public class LeaderTest extends AbstractLeaderTest { // get sent to the follower - but not the 5th assertEquals("The number of append entries collected should be 4", 4, allMessages.size()); - for(int i=0;i<4;i++) { + for (int i = 0; i < 4; i++) { long expected = allMessages.get(i).getEntries().get(0).getIndex(); - assertEquals(expected, i+2); + assertEquals(expected, i + 2); } } @@ -402,7 +414,7 @@ public class LeaderTest extends AbstractLeaderTest { followerActor.underlyingActor().clear(); - sendReplicate(actorContext, lastIndex+1); + sendReplicate(actorContext, lastIndex + 1); // Wait slightly longer than heartbeat duration Uninterruptibles.sleepUninterruptibly(750, TimeUnit.MILLISECONDS); @@ -413,9 +425,9 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("The number of append entries collected should be 2", 2, allMessages.size()); assertEquals(1, allMessages.get(0).getEntries().size()); - assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex()); + assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex()); assertEquals(1, allMessages.get(1).getEntries().size()); - assertEquals(lastIndex+1, allMessages.get(0).getEntries().get(0).getIndex()); + assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex()); } @@ -447,7 +459,7 @@ public class LeaderTest extends AbstractLeaderTest { followerActor.underlyingActor().clear(); - for(int i=0;i<3;i++) { + for (int i = 0; i < 3; i++) { Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS); leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); } @@ -486,7 +498,7 @@ public class LeaderTest extends AbstractLeaderTest { Uninterruptibles.sleepUninterruptibly(150, TimeUnit.MILLISECONDS); leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); - sendReplicate(actorContext, lastIndex+1); + sendReplicate(actorContext, lastIndex + 1); List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); assertEquals("The number of append entries collected should be 2", 2, allMessages.size()); @@ -508,13 +520,14 @@ public class LeaderTest extends AbstractLeaderTest { long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1; long term = actorContext.getTermInformation().getCurrentTerm(); - MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry( - term, newLogIndex, new MockRaftActorContext.MockPayload("foo")); + ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry( + newLogIndex, term, new MockRaftActorContext.MockPayload("foo")); actorContext.getReplicatedLog().append(newEntry); + final Identifier id = new MockIdentifier("state-id"); RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, - new Replicate(leaderActor, "state-id", newEntry)); + new Replicate(leaderActor, id, newEntry, true)); // State should not change assertTrue(raftBehavior instanceof Leader); @@ -527,7 +540,7 @@ public class LeaderTest extends AbstractLeaderTest { leaderActor, ApplyState.class); assertEquals("ApplyState count", newLogIndex, applyStateList.size()); - for(int i = 0; i <= newLogIndex - 1; i++ ) { + for (int i = 0; i <= newLogIndex - 1; i++) { ApplyState applyState = applyStateList.get(i); assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex()); assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm()); @@ -535,14 +548,14 @@ public class LeaderTest extends AbstractLeaderTest { ApplyState last = applyStateList.get((int) newLogIndex - 1); assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData()); - assertEquals("getIdentifier", "state-id", last.getIdentifier()); + assertEquals("getIdentifier", id, last.getIdentifier()); } @Test public void testSendAppendEntriesOnAnInProgressInstallSnapshot() throws Exception { logStart("testSendAppendEntriesOnAnInProgressInstallSnapshot"); - MockRaftActorContext actorContext = createActorContextWithFollower(); + final MockRaftActorContext actorContext = createActorContextWithFollower(); Map leadersSnapshot = new HashMap<>(); leadersSnapshot.put("1", "A"); @@ -554,9 +567,7 @@ public class LeaderTest extends AbstractLeaderTest { final int commitIndex = 3; final int snapshotIndex = 2; - final int newEntryIndex = 4; final int snapshotTerm = 1; - final int currentTerm = 2; // set the snapshot variables in replicatedlog actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex); @@ -570,19 +581,17 @@ public class LeaderTest extends AbstractLeaderTest { leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); leader.getFollower(FOLLOWER_ID).setNextIndex(0); - // new entry - ReplicatedLogImplEntry entry = - new ReplicatedLogImplEntry(newEntryIndex, currentTerm, - new MockRaftActorContext.MockPayload("D")); - //update follower timestamp leader.markFollowerActive(FOLLOWER_ID); ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.emptyList(), - commitIndex, snapshotTerm, commitIndex, snapshotTerm)); - FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs); - leader.setFollowerSnapshot(FOLLOWER_ID, fts); + leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()), + Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, + -1, null, null), ByteSource.wrap(bs.toByteArray()))); + LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState( + actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName()); + fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray())); + leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts); //send first chunk and no InstallSnapshotReply received yet fts.getNextChunk(); @@ -593,9 +602,7 @@ public class LeaderTest extends AbstractLeaderTest { leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); - AppendEntries aeproto = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - - AppendEntries ae = (AppendEntries) SerializationUtils.fromSerializable(aeproto); + AppendEntries ae = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); assertTrue("AppendEntries should be sent with empty entries", ae.getEntries().isEmpty()); @@ -613,7 +620,7 @@ public class LeaderTest extends AbstractLeaderTest { public void testSendAppendEntriesSnapshotScenario() throws Exception { logStart("testSendAppendEntriesSnapshotScenario"); - MockRaftActorContext actorContext = createActorContextWithFollower(); + final MockRaftActorContext actorContext = createActorContextWithFollower(); Map leadersSnapshot = new HashMap<>(); leadersSnapshot.put("1", "A"); @@ -640,8 +647,8 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); // new entry - ReplicatedLogImplEntry entry = - new ReplicatedLogImplEntry(newEntryIndex, currentTerm, + SimpleReplicatedLogEntry entry = + new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D")); actorContext.getReplicatedLog().append(entry); @@ -651,7 +658,7 @@ public class LeaderTest extends AbstractLeaderTest { // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex RaftActorBehavior raftBehavior = leader.handleMessage( - leaderActor, new Replicate(null, "state-id", entry)); + leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true)); assertTrue(raftBehavior instanceof Leader); @@ -688,7 +695,7 @@ public class LeaderTest extends AbstractLeaderTest { leader.setSnapshot(null); // new entry - ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm, + SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D")); actorContext.getReplicatedLog().append(entry); @@ -696,22 +703,21 @@ public class LeaderTest extends AbstractLeaderTest { //update follower timestamp leader.markFollowerActive(FOLLOWER_ID); - leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry)); + leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true)); assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing()); CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot(); - assertTrue(cs.isInstallSnapshotInitiated()); assertEquals(3, cs.getLastAppliedIndex()); assertEquals(1, cs.getLastAppliedTerm()); assertEquals(4, cs.getLastIndex()); assertEquals(2, cs.getLastTerm()); // if an initiate is started again when first is in progress, it shouldnt initiate Capture - leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry)); + leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true)); - Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); + assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); } @Test @@ -734,7 +740,11 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getReplicatedLog().removeFrom(0); + AtomicReference> installSnapshotStream = new AtomicReference<>(); + actorContext.setCreateSnapshotProcedure(out -> installSnapshotStream.set(out)); + leader = new Leader(actorContext); + actorContext.setCurrentBehavior(leader); // Leader will send an immediate heartbeat - ignore it. MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -742,13 +752,13 @@ public class LeaderTest extends AbstractLeaderTest { // set the snapshot as absent and check if capture-snapshot is invoked. leader.setSnapshot(null); - for(int i=0;i<4;i++) { - actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1, + for (int i = 0; i < 4; i++) { + actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1, new MockRaftActorContext.MockPayload("X" + i))); } // new entry - ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm, + SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D")); actorContext.getReplicatedLog().append(entry); @@ -758,22 +768,40 @@ public class LeaderTest extends AbstractLeaderTest { // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets // installed with a SendInstallSnapshot - leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true)); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true)); assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing()); CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot(); - - assertTrue(cs.isInstallSnapshotInitiated()); assertEquals(3, cs.getLastAppliedIndex()); assertEquals(1, cs.getLastAppliedTerm()); assertEquals(4, cs.getLastIndex()); assertEquals(2, cs.getLastTerm()); - // if an initiate is started again when first is in progress, it shouldnt initiate Capture - leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry)); + assertNotNull("Create snapshot procedure not invoked", installSnapshotStream.get()); + assertTrue("Install snapshot stream present", installSnapshotStream.get().isPresent()); + + MessageCollectorActor.clearMessages(followerActor); + + // Sending Replicate message should not initiate another capture since the first is in progress. + leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true)); + assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); + + // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture. + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true)); + assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); - Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); + // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent. + final byte[] bytes = new byte[]{1, 2, 3}; + installSnapshotStream.get().get().write(bytes); + actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(), + Runtime.getRuntime().totalMemory()); + MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); + + // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk. + MessageCollectorActor.clearMessages(followerActor); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true)); + MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200); } @@ -781,7 +809,7 @@ public class LeaderTest extends AbstractLeaderTest { public void testInstallSnapshot() throws Exception { logStart("testInstallSnapshot"); - MockRaftActorContext actorContext = createActorContextWithFollower(); + final MockRaftActorContext actorContext = createActorContextWithFollower(); Map leadersSnapshot = new HashMap<>(); leadersSnapshot.put("1", "A"); @@ -811,17 +839,19 @@ public class LeaderTest extends AbstractLeaderTest { leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); leader.getFollower(FOLLOWER_ID).setNextIndex(0); - Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(), - Collections.emptyList(), - lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm); + byte[] bytes = toByteString(leadersSnapshot).toByteArray(); + Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.emptyList(), + lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null); - RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot)); + RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, + new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes))); assertTrue(raftBehavior instanceof Leader); // check if installsnapshot gets called with the correct values. - InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); + InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, + InstallSnapshot.class); assertNotNull(installSnapshot.getData()); assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex()); @@ -834,7 +864,7 @@ public class LeaderTest extends AbstractLeaderTest { public void testForceInstallSnapshot() throws Exception { logStart("testForceInstallSnapshot"); - MockRaftActorContext actorContext = createActorContextWithFollower(); + final MockRaftActorContext actorContext = createActorContextWithFollower(); Map leadersSnapshot = new HashMap<>(); leadersSnapshot.put("1", "A"); @@ -861,17 +891,19 @@ public class LeaderTest extends AbstractLeaderTest { leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); leader.getFollower(FOLLOWER_ID).setNextIndex(-1); - Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(), - Collections.emptyList(), - lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm); + byte[] bytes = toByteString(leadersSnapshot).toByteArray(); + Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.emptyList(), + lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null); - RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot)); + RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, + new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes))); assertTrue(raftBehavior instanceof Leader); // check if installsnapshot gets called with the correct values. - InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); + InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, + InstallSnapshot.class); assertNotNull(installSnapshot.getData()); assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex()); @@ -894,6 +926,7 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.setCommitIndex(commitIndex); leader = new Leader(actorContext); + actorContext.setCurrentBehavior(leader); leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); leader.getFollower(FOLLOWER_ID).setNextIndex(0); @@ -913,11 +946,14 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.emptyList(), - commitIndex, snapshotTerm, commitIndex, snapshotTerm)); - FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs); - leader.setFollowerSnapshot(FOLLOWER_ID, fts); - while(!fts.isLastChunk(fts.getChunkIndex())) { + leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()), + Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, + -1, null, null), ByteSource.wrap(bs.toByteArray()))); + LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState( + actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName()); + fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray())); + leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts); + while (!fts.isLastChunk(fts.getChunkIndex())) { fts.getNextChunk(); fts.incrementChunkIndex(); } @@ -930,12 +966,13 @@ public class LeaderTest extends AbstractLeaderTest { assertTrue(raftBehavior instanceof Leader); - assertEquals(0, leader.followerSnapshotSize()); assertEquals(1, leader.followerLogSize()); FollowerLogInformation fli = leader.getFollower(FOLLOWER_ID); assertNotNull(fli); + assertNull(fli.getInstallSnapshotState()); assertEquals(commitIndex, fli.getMatchIndex()); assertEquals(commitIndex + 1, fli.getNextIndex()); + assertFalse(leader.hasSnapshot()); } @Test @@ -949,7 +986,7 @@ public class LeaderTest extends AbstractLeaderTest { final int snapshotTerm = 1; final int currentTerm = 2; - DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){ + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() { @Override public int getSnapshotChunkSize() { return 50; @@ -962,6 +999,7 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.setCommitIndex(commitIndex); leader = new Leader(actorContext); + actorContext.setCurrentBehavior(leader); leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); leader.getFollower(FOLLOWER_ID).setNextIndex(0); @@ -977,13 +1015,14 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); ByteString bs = toByteString(leadersSnapshot); - Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.emptyList(), - commitIndex, snapshotTerm, commitIndex, snapshotTerm); - leader.setSnapshot(snapshot); + Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()), + Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, + -1, null, null); - leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot)); + leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray()))); - InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); + InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, + InstallSnapshot.class); assertEquals(1, installSnapshot.getChunkIndex()); assertEquals(3, installSnapshot.getTotalChunks()); @@ -1010,12 +1049,12 @@ public class LeaderTest extends AbstractLeaderTest { installSnapshot = MessageCollectorActor.getFirstMatching(followerActor, InstallSnapshot.class); - Assert.assertNull(installSnapshot); + assertNull(installSnapshot); } @Test - public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{ + public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception { logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -1025,7 +1064,7 @@ public class LeaderTest extends AbstractLeaderTest { final int snapshotTerm = 1; final int currentTerm = 2; - actorContext.setConfigParams(new DefaultConfigParamsImpl(){ + actorContext.setConfigParams(new DefaultConfigParamsImpl() { @Override public int getSnapshotChunkSize() { return 50; @@ -1050,14 +1089,15 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); ByteString bs = toByteString(leadersSnapshot); - Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.emptyList(), - commitIndex, snapshotTerm, commitIndex, snapshotTerm); - leader.setSnapshot(snapshot); + Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()), + Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, + -1, null, null); Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); - leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot)); + leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray()))); - InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); + InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, + InstallSnapshot.class); assertEquals(1, installSnapshot.getChunkIndex()); assertEquals(3, installSnapshot.getTotalChunks()); @@ -1114,19 +1154,21 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); ByteString bs = toByteString(leadersSnapshot); - Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.emptyList(), - commitIndex, snapshotTerm, commitIndex, snapshotTerm); - leader.setSnapshot(snapshot); + Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()), + Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, + -1, null, null); - leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot)); + leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray()))); - InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); + InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, + InstallSnapshot.class); assertEquals(1, installSnapshot.getChunkIndex()); assertEquals(3, installSnapshot.getTotalChunks()); - assertEquals(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE, installSnapshot.getLastChunkHashCode().get().intValue()); + assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE, + installSnapshot.getLastChunkHashCode().get().intValue()); - int hashCode = Arrays.hashCode(installSnapshot.getData()); + final int hashCode = Arrays.hashCode(installSnapshot.getData()); followerActor.underlyingActor().clear(); @@ -1141,19 +1183,8 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testFollowerToSnapshotLogic() { - logStart("testFollowerToSnapshotLogic"); - - MockRaftActorContext actorContext = createActorContext(); - - actorContext.setConfigParams(new DefaultConfigParamsImpl() { - @Override - public int getSnapshotChunkSize() { - return 50; - } - }); - - leader = new Leader(actorContext); + public void testLeaderInstallSnapshotState() throws IOException { + logStart("testLeaderInstallSnapshotState"); Map leadersSnapshot = new HashMap<>(); leadersSnapshot.put("1", "A"); @@ -1163,22 +1194,22 @@ public class LeaderTest extends AbstractLeaderTest { ByteString bs = toByteString(leadersSnapshot); byte[] barray = bs.toByteArray(); - FollowerToSnapshot fts = leader.new FollowerToSnapshot(bs); - leader.setFollowerSnapshot(FOLLOWER_ID, fts); + LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test"); + fts.setSnapshotBytes(ByteSource.wrap(barray)); assertEquals(bs.size(), barray.length); - int chunkIndex=0; - for (int i=0; i < barray.length; i = i + 50) { - int j = i + 50; + int chunkIndex = 0; + for (int i = 0; i < barray.length; i = i + 50) { + int length = i + 50; chunkIndex++; if (i + 50 > barray.length) { - j = barray.length; + length = barray.length; } byte[] chunk = fts.getNextChunk(); - assertEquals("bytestring size not matching for chunk:"+ chunkIndex, j-i, chunk.length); + assertEquals("bytestring size not matching for chunk:" + chunkIndex, length - i, chunk.length); assertEquals("chunkindex not matching", chunkIndex, fts.getChunkIndex()); fts.markSendStatus(true); @@ -1188,10 +1219,11 @@ public class LeaderTest extends AbstractLeaderTest { } assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks()); + fts.close(); } - @Override protected RaftActorBehavior createBehavior( - RaftActorContext actorContext) { + @Override + protected Leader createBehavior(final RaftActorContext actorContext) { return new Leader(actorContext); } @@ -1205,13 +1237,6 @@ public class LeaderTest extends AbstractLeaderTest { return createActorContext(LEADER_ID, actorRef); } - private MockRaftActorContext createActorContextWithFollower() { - MockRaftActorContext actorContext = createActorContext(); - actorContext.setPeerAddresses(ImmutableMap.builder().put(FOLLOWER_ID, - followerActor.path().toString()).build()); - return actorContext; - } - private MockRaftActorContext createActorContext(String id, ActorRef actorRef) { DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS)); @@ -1222,6 +1247,13 @@ public class LeaderTest extends AbstractLeaderTest { return context; } + private MockRaftActorContext createActorContextWithFollower() { + MockRaftActorContext actorContext = createActorContext(); + actorContext.setPeerAddresses(ImmutableMap.builder().put(FOLLOWER_ID, + followerActor.path().toString()).build()); + return actorContext; + } + private MockRaftActorContext createFollowerActorContextWithLeader() { MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor); DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl(); @@ -1235,12 +1267,13 @@ public class LeaderTest extends AbstractLeaderTest { public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception { logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex"); - MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + final MockRaftActorContext leaderActorContext = createActorContextWithFollower(); MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor); Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); + followerActorContext.setCurrentBehavior(follower); Map peerAddresses = new HashMap<>(); peerAddresses.put(FOLLOWER_ID, followerActor.path().toString()); @@ -1264,10 +1297,11 @@ public class LeaderTest extends AbstractLeaderTest { followerActorContext.setCommitIndex(1); leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - assertEquals(1, appendEntries.getLeaderCommit()); + assertEquals(-1, appendEntries.getLeaderCommit()); assertEquals(0, appendEntries.getEntries().size()); assertEquals(0, appendEntries.getPrevLogIndex()); @@ -1288,13 +1322,14 @@ public class LeaderTest extends AbstractLeaderTest { public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception { logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex"); - MockRaftActorContext leaderActorContext = createActorContext(); + final MockRaftActorContext leaderActorContext = createActorContext(); MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor); followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString())); Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); + followerActorContext.setCurrentBehavior(follower); Map leaderPeerAddresses = new HashMap<>(); leaderPeerAddresses.put(FOLLOWER_ID, followerActor.path().toString()); @@ -1321,7 +1356,7 @@ public class LeaderTest extends AbstractLeaderTest { // Initial heartbeat AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - assertEquals(1, appendEntries.getLeaderCommit()); + assertEquals(-1, appendEntries.getLeaderCommit()); assertEquals(0, appendEntries.getEntries().size()); assertEquals(0, appendEntries.getPrevLogIndex()); @@ -1359,7 +1394,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader(){ + public void testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader() { logStart("testHandleAppendEntriesReplyFailureWithFollowersLogBehindTheLeader"); MockRaftActorContext leaderActorContext = createActorContextWithFollower(); @@ -1372,8 +1407,8 @@ public class LeaderTest extends AbstractLeaderTest { leaderActorContext.setCommitIndex(leaderCommitIndex); leaderActorContext.setLastApplied(leaderCommitIndex); - ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1); - ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2); + final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1); + final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2); MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader(); @@ -1388,13 +1423,14 @@ public class LeaderTest extends AbstractLeaderTest { leader = new Leader(leaderActorContext); AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, + AppendEntriesReply.class); MessageCollectorActor.clearMessages(followerActor); MessageCollectorActor.clearMessages(leaderActor); - // Verify initial AppendEntries sent with the leader's current commit index. - assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + // Verify initial AppendEntries sent. + assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit()); assertEquals("Log entries size", 0, appendEntries.getEntries().size()); assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex()); @@ -1406,7 +1442,7 @@ public class LeaderTest extends AbstractLeaderTest { appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); - assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex()); assertEquals("Log entries size", 2, appendEntries.getEntries().size()); assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex()); @@ -1451,8 +1487,8 @@ public class LeaderTest extends AbstractLeaderTest { leaderActorContext.setCommitIndex(leaderCommitIndex); leaderActorContext.setLastApplied(leaderCommitIndex); - ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0); - ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1); + final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0); + final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1); MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader(); @@ -1462,21 +1498,24 @@ public class LeaderTest extends AbstractLeaderTest { Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); + followerActorContext.setCurrentBehavior(follower); leader = new Leader(leaderActorContext); AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, + AppendEntriesReply.class); MessageCollectorActor.clearMessages(followerActor); MessageCollectorActor.clearMessages(leaderActor); // Verify initial AppendEntries sent with the leader's current commit index. - assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit()); assertEquals("Log entries size", 0, appendEntries.getEntries().size()); assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex()); leaderActor.underlyingActor().setBehavior(leader); + leaderActorContext.setCurrentBehavior(leader); leader.handleMessage(followerActor, appendEntriesReply); @@ -1516,7 +1555,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent(){ + public void testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent() { logStart("testHandleAppendEntriesReplyFailureWithFollowersLogTermDifferent"); MockRaftActorContext leaderActorContext = createActorContextWithFollower(); @@ -1529,8 +1568,8 @@ public class LeaderTest extends AbstractLeaderTest { leaderActorContext.setCommitIndex(leaderCommitIndex); leaderActorContext.setLastApplied(leaderCommitIndex); - ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0); - ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1); + final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0); + final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1); MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader(); @@ -1541,21 +1580,24 @@ public class LeaderTest extends AbstractLeaderTest { Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); + followerActorContext.setCurrentBehavior(follower); leader = new Leader(leaderActorContext); AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, + AppendEntriesReply.class); MessageCollectorActor.clearMessages(followerActor); MessageCollectorActor.clearMessages(leaderActor); // Verify initial AppendEntries sent with the leader's current commit index. - assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit()); assertEquals("Log entries size", 0, appendEntries.getEntries().size()); assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex()); leaderActor.underlyingActor().setBehavior(leader); + leaderActorContext.setCurrentBehavior(leader); leader.handleMessage(followerActor, appendEntriesReply); @@ -1598,7 +1640,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testHandleAppendEntriesReplyWithNewerTerm(){ + public void testHandleAppendEntriesReplyWithNewerTerm() { logStart("testHandleAppendEntriesReplyWithNewerTerm"); MockRaftActorContext leaderActorContext = createActorContext(); @@ -1612,7 +1654,8 @@ public class LeaderTest extends AbstractLeaderTest { leaderActor.underlyingActor().setBehavior(leader); leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender()); - AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, + AppendEntriesReply.class); assertEquals(false, appendEntriesReply.isSuccess()); assertEquals(RaftState.Follower, leaderActor.underlyingActor().getFirstBehaviorChange().state()); @@ -1621,7 +1664,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled(){ + public void testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled() { logStart("testHandleAppendEntriesReplyWithNewerTermWhenElectionsAreDisabled"); MockRaftActorContext leaderActorContext = createActorContext(); @@ -1636,7 +1679,8 @@ public class LeaderTest extends AbstractLeaderTest { leaderActor.underlyingActor().setBehavior(leader); leaderActor.tell(new AppendEntriesReply("foo", 20, false, 1000, 10, (short) 1), ActorRef.noSender()); - AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, + AppendEntriesReply.class); assertEquals(false, appendEntriesReply.isSuccess()); assertEquals(RaftState.Leader, leaderActor.underlyingActor().getFirstBehaviorChange().state()); @@ -1664,7 +1708,6 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals(payloadVersion, leader.getLeaderPayloadVersion()); assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion()); - short payloadVersion = 5; AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion); RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply); @@ -1696,7 +1739,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testHandleAppendEntriesReplyUnknownFollower(){ + public void testHandleAppendEntriesReplyUnknownFollower() { logStart("testHandleAppendEntriesReplyUnknownFollower"); MockRaftActorContext leaderActorContext = createActorContext(); @@ -1725,10 +1768,10 @@ public class LeaderTest extends AbstractLeaderTest { leaderActorContext.setCommitIndex(leaderCommitIndex); leaderActorContext.setLastApplied(leaderCommitIndex); - ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0); - ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1); - ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2); - ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3); + final ReplicatedLogEntry leadersFirstLogEntry = leaderActorContext.getReplicatedLog().get(0); + final ReplicatedLogEntry leadersSecondLogEntry = leaderActorContext.getReplicatedLog().get(1); + final ReplicatedLogEntry leadersThirdLogEntry = leaderActorContext.getReplicatedLog().get(2); + final ReplicatedLogEntry leadersFourthLogEntry = leaderActorContext.getReplicatedLog().get(3); MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader(); @@ -1738,25 +1781,29 @@ public class LeaderTest extends AbstractLeaderTest { Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); + followerActorContext.setCurrentBehavior(follower); leader = new Leader(leaderActorContext); AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); - AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + final AppendEntriesReply appendEntriesReply = MessageCollectorActor.expectFirstMatching(leaderActor, + AppendEntriesReply.class); MessageCollectorActor.clearMessages(followerActor); MessageCollectorActor.clearMessages(leaderActor); // Verify initial AppendEntries sent with the leader's current commit index. - assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); + assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit()); assertEquals("Log entries size", 0, appendEntries.getEntries().size()); assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex()); leaderActor.underlyingActor().setBehavior(leader); + leaderActorContext.setCurrentBehavior(leader); leader.handleMessage(followerActor, appendEntriesReply); - List appendEntriesList = MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2); + List appendEntriesList = MessageCollectorActor.expectMatching(followerActor, + AppendEntries.class, 2); MessageCollectorActor.expectMatching(leaderActor, AppendEntriesReply.class, 2); appendEntries = appendEntriesList.get(0); @@ -1773,7 +1820,7 @@ public class LeaderTest extends AbstractLeaderTest { appendEntries = appendEntriesList.get(1); assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit()); - assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); + assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex()); assertEquals("Log entries size", 2, appendEntries.getEntries().size()); assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex()); @@ -1793,7 +1840,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testHandleRequestVoteReply(){ + public void testHandleRequestVoteReply() { logStart("testHandleRequestVoteReply"); MockRaftActorContext leaderActorContext = createActorContext(); @@ -1818,11 +1865,30 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext leaderActorContext = createActorContext(); leader = new Leader(leaderActorContext); - RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK); - Assert.assertTrue(behavior instanceof Leader); + RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK); + assertTrue(newBehavior instanceof Leader); } - private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy){ + @Test + public void testIsolatedLeaderCheckNoVotingFollowers() { + logStart("testIsolatedLeaderCheckNoVotingFollowers"); + + MockRaftActorContext followerActorContext = createFollowerActorContextWithLeader(); + Follower follower = new Follower(followerActorContext); + followerActor.underlyingActor().setBehavior(follower); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(1000, TimeUnit.SECONDS)); + leaderActorContext.getPeerInfo(FOLLOWER_ID).setVotingState(VotingState.NON_VOTING); + + leader = new Leader(leaderActorContext); + leader.getFollower(FOLLOWER_ID).markFollowerActive(); + RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK); + assertTrue("Expected Leader", newBehavior instanceof Leader); + } + + private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy) { ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1"); ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2"); @@ -1839,9 +1905,8 @@ public class LeaderTest extends AbstractLeaderTest { leader.markFollowerActive("follower-1"); leader.markFollowerActive("follower-2"); - RaftActorBehavior behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK); - Assert.assertTrue("Behavior not instance of Leader when all followers are active", - behavior instanceof Leader); + RaftActorBehavior newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK); + assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader); // kill 1 follower and verify if that got killed final JavaTestKit probe = new JavaTestKit(getSystem()); @@ -1852,9 +1917,9 @@ public class LeaderTest extends AbstractLeaderTest { leader.markFollowerInActive("follower-1"); leader.markFollowerActive("follower-2"); - behavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK); - Assert.assertTrue("Behavior not instance of Leader when majority of followers are active", - behavior instanceof Leader); + newBehavior = leader.handleMessage(leaderActor, Leader.ISOLATED_LEADER_CHECK); + assertTrue("Behavior not instance of Leader when majority of followers are active", + newBehavior instanceof Leader); // kill 2nd follower and leader should change to Isolated leader followerActor2.tell(PoisonPill.getInstance(), null); @@ -1871,80 +1936,77 @@ public class LeaderTest extends AbstractLeaderTest { public void testIsolatedLeaderCheckTwoFollowers() throws Exception { logStart("testIsolatedLeaderCheckTwoFollowers"); - RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE); + RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE); - Assert.assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive", - behavior instanceof IsolatedLeader); + assertTrue("Behavior not instance of IsolatedLeader when majority followers are inactive", + newBehavior instanceof IsolatedLeader); } @Test public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception { logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled"); - RaftActorBehavior behavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true)); + RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true)); - Assert.assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled", - behavior instanceof Leader); + assertTrue("Behavior should not switch to IsolatedLeader because elections are disabled", + newBehavior instanceof Leader); } @Test public void testLaggingFollowerStarvation() throws Exception { logStart("testLaggingFollowerStarvation"); - new JavaTestKit(getSystem()) {{ - String leaderActorId = actorFactory.generateActorId("leader"); - String follower1ActorId = actorFactory.generateActorId("follower"); - String follower2ActorId = actorFactory.generateActorId("follower"); - TestActorRef leaderActor = - actorFactory.createTestActor(ForwardMessageToBehaviorActor.props(), leaderActorId); - ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId); - ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId); + String leaderActorId = actorFactory.generateActorId("leader"); + String follower1ActorId = actorFactory.generateActorId("follower"); + String follower2ActorId = actorFactory.generateActorId("follower"); - MockRaftActorContext leaderActorContext = - new MockRaftActorContext(leaderActorId, getSystem(), leaderActor); + final ActorRef follower1Actor = actorFactory.createActor(MessageCollectorActor.props(), follower1ActorId); + final ActorRef follower2Actor = actorFactory.createActor(MessageCollectorActor.props(), follower2ActorId); - DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); - configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS)); - configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS)); + MockRaftActorContext leaderActorContext = + new MockRaftActorContext(leaderActorId, getSystem(), leaderActor); - leaderActorContext.setConfigParams(configParams); + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); + configParams.setHeartBeatInterval(new FiniteDuration(200, TimeUnit.MILLISECONDS)); + configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS)); - leaderActorContext.setReplicatedLog( - new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build()); + leaderActorContext.setConfigParams(configParams); - Map peerAddresses = new HashMap<>(); - peerAddresses.put(follower1ActorId, - follower1Actor.path().toString()); - peerAddresses.put(follower2ActorId, - follower2Actor.path().toString()); + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(1,5,1).build()); - leaderActorContext.setPeerAddresses(peerAddresses); - leaderActorContext.getTermInformation().update(1, leaderActorId); + Map peerAddresses = new HashMap<>(); + peerAddresses.put(follower1ActorId, + follower1Actor.path().toString()); + peerAddresses.put(follower2ActorId, + follower2Actor.path().toString()); - RaftActorBehavior leader = createBehavior(leaderActorContext); + leaderActorContext.setPeerAddresses(peerAddresses); + leaderActorContext.getTermInformation().update(1, leaderActorId); - leaderActor.underlyingActor().setBehavior(leader); + leader = createBehavior(leaderActorContext); - for(int i=1;i<6;i++) { - // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733) - RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0)); - assertTrue(newBehavior == leader); - Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); - } + leaderActor.underlyingActor().setBehavior(leader); - // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply - List heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class); + for (int i = 1; i < 6; i++) { + // Each AppendEntriesReply could end up rescheduling the heartbeat (without the fix for bug 2733) + RaftActorBehavior newBehavior = leader.handleMessage(follower1Actor, + new AppendEntriesReply(follower1ActorId, 1, true, i, 1, (short)0)); + assertTrue(newBehavior == leader); + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + } - assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()), - heartbeats.size() > 1); + // Check if the leader has been receiving SendHeartbeat messages despite getting AppendEntriesReply + List heartbeats = MessageCollectorActor.getAllMatching(leaderActor, SendHeartBeat.class); - // Check if follower-2 got AppendEntries during this time and was not starved - List appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class); + assertTrue(String.format("%s heartbeat(s) is less than expected", heartbeats.size()), + heartbeats.size() > 1); - assertTrue(String.format("%s append entries is less than expected", appendEntries.size()), - appendEntries.size() > 1); + // Check if follower-2 got AppendEntries during this time and was not starved + List appendEntries = MessageCollectorActor.getAllMatching(follower2Actor, AppendEntries.class); - }}; + assertTrue(String.format("%s append entries is less than expected", appendEntries.size()), + appendEntries.size() > 1); } @Test @@ -1956,14 +2018,18 @@ public class LeaderTest extends AbstractLeaderTest { new FiniteDuration(1000, TimeUnit.SECONDS)); leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + leaderActorContext.setCommitIndex(-1); + leaderActorContext.setLastApplied(-1); String nonVotingFollowerId = "nonvoting-follower"; TestActorRef nonVotingFollowerActor = actorFactory.createTestActor( Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId)); - leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING); + leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), + VotingState.NON_VOTING); leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); // Ignore initial heartbeats MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -2015,11 +2081,13 @@ public class LeaderTest extends AbstractLeaderTest { logStart("testTransferLeadershipWithFollowerInSync"); MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + leaderActorContext.setLastApplied(-1); ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( new FiniteDuration(1000, TimeUnit.SECONDS)); leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); // Initial heartbeat MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -2037,6 +2105,7 @@ public class LeaderTest extends AbstractLeaderTest { leader.transferLeadership(mockTransferCohort); verify(mockTransferCohort, never()).transferComplete(); + doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId(); MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0)); @@ -2044,7 +2113,7 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.expectMatching(followerActor, AppendEntries.class, 2); // Leader should force an election timeout - MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class); + MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class); verify(mockTransferCohort).transferComplete(); } @@ -2059,6 +2128,7 @@ public class LeaderTest extends AbstractLeaderTest { leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); // Initial heartbeat MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -2066,6 +2136,7 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.clearMessages(followerActor); RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); + doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId(); leader.transferLeadership(mockTransferCohort); verify(mockTransferCohort, never()).transferComplete(); @@ -2076,7 +2147,7 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); // Leader should force an election timeout - MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class); + MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class); verify(mockTransferCohort).transferComplete(); } @@ -2090,12 +2161,14 @@ public class LeaderTest extends AbstractLeaderTest { new FiniteDuration(200, TimeUnit.MILLISECONDS)); leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); // Initial heartbeat MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); MessageCollectorActor.clearMessages(followerActor); RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); + doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId(); leader.transferLeadership(mockTransferCohort); verify(mockTransferCohort, never()).transferComplete(); @@ -2105,14 +2178,14 @@ public class LeaderTest extends AbstractLeaderTest { leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0)); MessageCollectorActor.clearMessages(followerActor); - Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams(). - getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS); + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams() + .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS); leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 1, 1, (short)0)); // Leader should force an election timeout - MessageCollectorActor.expectFirstMatching(followerActor, ElectionTimeout.class); + MessageCollectorActor.expectFirstMatching(followerActor, TimeoutNow.class); verify(mockTransferCohort).transferComplete(); } @@ -2128,6 +2201,7 @@ public class LeaderTest extends AbstractLeaderTest { leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); // Initial heartbeat MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); @@ -2145,9 +2219,9 @@ public class LeaderTest extends AbstractLeaderTest { verify(mockTransferCohort, never()).transferComplete(); // Send heartbeats to time out the transfer. - for(int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) { - Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams(). - getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS); + for (int i = 0; i < leaderActorContext.getConfigParams().getElectionTimeoutFactor(); i++) { + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams() + .getHeartBeatInterval().toMillis() + 1, TimeUnit.MILLISECONDS); leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); } @@ -2157,7 +2231,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Override - protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(RaftActorContext actorContext, + protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext, ActorRef actorRef, RaftRPC rpc) throws Exception { super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc); assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor()); @@ -2168,7 +2242,7 @@ public class LeaderTest extends AbstractLeaderTest { private final long electionTimeOutIntervalMillis; private final int snapshotChunkSize; - public MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) { + MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) { super(); this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis; this.snapshotChunkSize = snapshotChunkSize;