X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeaderTest.java;h=3805576ae19b15b40d991427c4a2a4bdf9b3599b;hb=refs%2Fchanges%2F98%2F82498%2F1;hp=380a0c2715aa2e76202da6928895813871702fb8;hpb=79e6240ad565717e2fba62a339f11fcbd239f440;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 380a0c2715..3805576ae1 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -14,6 +14,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -22,19 +23,28 @@ import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Terminated; -import akka.testkit.JavaTestKit; +import akka.protobuf.ByteString; import akka.testkit.TestActorRef; +import akka.testkit.javadsl.TestKit; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.io.ByteSource; import com.google.common.util.concurrent.Uninterruptibles; -import com.google.protobuf.ByteString; +import java.io.IOException; +import java.io.OutputStream; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang3.SerializationUtils; import org.junit.After; import org.junit.Test; +import org.opendaylight.controller.cluster.messaging.MessageSlice; +import org.opendaylight.controller.cluster.messaging.MessageSliceReply; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.FollowerLogInformation; import org.opendaylight.controller.cluster.raft.MockRaftActorContext; @@ -43,7 +53,6 @@ import org.opendaylight.controller.cluster.raft.RaftActorLeadershipTransferCohor import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.RaftVersions; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; -import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.VotingState; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; @@ -52,6 +61,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.Replicate; import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; +import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader.SnapshotHolder; import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; @@ -59,9 +69,12 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; +import org.opendaylight.controller.cluster.raft.persisted.ByteState; import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy; import org.opendaylight.controller.cluster.raft.policy.RaftPolicy; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.yangtools.concepts.Identifier; @@ -83,7 +96,7 @@ public class LeaderTest extends AbstractLeaderTest { @Override @After - public void tearDown() throws Exception { + public void tearDown() { if (leader != null) { leader.close(); } @@ -92,7 +105,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testHandleMessageForUnknownMessage() throws Exception { + public void testHandleMessageForUnknownMessage() { logStart("testHandleMessageForUnknownMessage"); leader = new Leader(createActorContext()); @@ -102,7 +115,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception { + public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() { logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -147,20 +160,24 @@ public class LeaderTest extends AbstractLeaderTest { } - private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index) { + private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long index) { return sendReplicate(actorContext, 1, index); } - private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) { - MockRaftActorContext.MockPayload payload = new MockRaftActorContext.MockPayload("foo"); - MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry( - term, index, payload); + private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term, + final long index) { + return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo")); + } + + private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term, final long index, + final Payload payload) { + SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload); actorContext.getReplicatedLog().append(newEntry); - return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry)); + return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true)); } @Test - public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception { + public void testHandleReplicateMessageSendAppendEntriesToFollower() { logStart("testHandleReplicateMessageSendAppendEntriesToFollower"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -197,7 +214,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception { + public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() { logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -252,7 +269,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception { + public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() { logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -290,7 +307,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception { + public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() { logStart("testHandleReplicateMessageSendAppendEntriesToFollower"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -330,7 +347,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception { + public void testMultipleReplicateWithReplyShouldResultInAppendEntries() { logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -361,26 +378,47 @@ public class LeaderTest extends AbstractLeaderTest { sendReplicate(actorContext, lastIndex + i + 1); leader.handleMessage(followerActor, new AppendEntriesReply( FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0)); - } - for (int i = 3; i < 5; i++) { - sendReplicate(actorContext, lastIndex + i + 1); + // We are expecting six messages here -- a request to replicate and a consensus-reached message + List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + assertEquals("The number of request/consensus appends collected", 6, allMessages.size()); + for (int i = 0; i < 3; i++) { + assertRequestEntry(lastIndex, allMessages, i); + assertCommitEntry(lastIndex, allMessages, i); } - List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); - // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would - // get sent to the follower - but not the 5th - assertEquals("The number of append entries collected should be 4", 4, allMessages.size()); + // Now perform another commit, eliciting a request to persist + sendReplicate(actorContext, lastIndex + 3 + 1); + allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + // This elicits another message for request to replicate + assertEquals("The number of request entries collected", 7, allMessages.size()); + assertRequestEntry(lastIndex, allMessages, 3); - for (int i = 0; i < 4; i++) { - long expected = allMessages.get(i).getEntries().get(0).getIndex(); - assertEquals(expected, i + 2); - } + sendReplicate(actorContext, lastIndex + 4 + 1); + allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + assertEquals("The number of request entries collected", 7, allMessages.size()); + } + + private static void assertCommitEntry(final long lastIndex, final List allMessages, + final int messageNr) { + final AppendEntries commitReq = allMessages.get(2 * messageNr + 1); + assertEquals(lastIndex + messageNr + 1, commitReq.getLeaderCommit()); + assertEquals(ImmutableList.of(), commitReq.getEntries()); + } + + private static void assertRequestEntry(final long lastIndex, final List allMessages, + final int messageNr) { + final AppendEntries req = allMessages.get(2 * messageNr); + assertEquals(lastIndex + messageNr, req.getLeaderCommit()); + + final List entries = req.getEntries(); + assertEquals(1, entries.size()); + assertEquals(messageNr + 2, entries.get(0).getIndex()); } @Test - public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception { + public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() { logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -425,7 +463,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception { + public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() { logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -462,7 +500,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception { + public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() { logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -502,7 +540,7 @@ public class LeaderTest extends AbstractLeaderTest { @Test - public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception { + public void testHandleReplicateMessageWhenThereAreNoFollowers() { logStart("testHandleReplicateMessageWhenThereAreNoFollowers"); MockRaftActorContext actorContext = createActorContext(); @@ -513,13 +551,14 @@ public class LeaderTest extends AbstractLeaderTest { long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1; long term = actorContext.getTermInformation().getCurrentTerm(); - MockRaftActorContext.MockReplicatedLogEntry newEntry = new MockRaftActorContext.MockReplicatedLogEntry( - term, newLogIndex, new MockRaftActorContext.MockPayload("foo")); + ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry( + newLogIndex, term, new MockRaftActorContext.MockPayload("foo")); actorContext.getReplicatedLog().append(newEntry); final Identifier id = new MockIdentifier("state-id"); - RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new Replicate(leaderActor, id, newEntry)); + RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, + new Replicate(leaderActor, id, newEntry, true)); // State should not change assertTrue(raftBehavior instanceof Leader); @@ -532,7 +571,7 @@ public class LeaderTest extends AbstractLeaderTest { leaderActor, ApplyState.class); assertEquals("ApplyState count", newLogIndex, applyStateList.size()); - for (int i = 0; i <= newLogIndex - 1; i++ ) { + for (int i = 0; i <= newLogIndex - 1; i++) { ApplyState applyState = applyStateList.get(i); assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex()); assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm()); @@ -577,11 +616,12 @@ public class LeaderTest extends AbstractLeaderTest { leader.markFollowerActive(FOLLOWER_ID); ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.emptyList(), - commitIndex, snapshotTerm, commitIndex, snapshotTerm)); + leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()), + Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, + -1, null, null), ByteSource.wrap(bs.toByteArray()))); LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState( actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName()); - fts.setSnapshotBytes(bs); + fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray())); leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts); //send first chunk and no InstallSnapshotReply received yet @@ -608,7 +648,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testSendAppendEntriesSnapshotScenario() throws Exception { + public void testSendAppendEntriesSnapshotScenario() { logStart("testSendAppendEntriesSnapshotScenario"); final MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -649,7 +689,7 @@ public class LeaderTest extends AbstractLeaderTest { // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex RaftActorBehavior raftBehavior = leader.handleMessage( - leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry)); + leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true)); assertTrue(raftBehavior instanceof Leader); @@ -657,7 +697,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testInitiateInstallSnapshot() throws Exception { + public void testInitiateInstallSnapshot() { logStart("testInitiateInstallSnapshot"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -683,7 +723,7 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); // set the snapshot as absent and check if capture-snapshot is invoked. - leader.setSnapshot(null); + leader.setSnapshotHolder(null); // new entry SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, @@ -694,20 +734,19 @@ public class LeaderTest extends AbstractLeaderTest { //update follower timestamp leader.markFollowerActive(FOLLOWER_ID); - leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry)); + leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true)); assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing()); CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot(); - assertTrue(cs.isInstallSnapshotInitiated()); assertEquals(3, cs.getLastAppliedIndex()); assertEquals(1, cs.getLastAppliedTerm()); assertEquals(4, cs.getLastIndex()); assertEquals(2, cs.getLastTerm()); // if an initiate is started again when first is in progress, it shouldnt initiate Capture - leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry)); + leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true)); assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); } @@ -732,6 +771,9 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getReplicatedLog().removeFrom(0); + AtomicReference> installSnapshotStream = new AtomicReference<>(); + actorContext.setCreateSnapshotProcedure(installSnapshotStream::set); + leader = new Leader(actorContext); actorContext.setCurrentBehavior(leader); @@ -739,7 +781,7 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); // set the snapshot as absent and check if capture-snapshot is invoked. - leader.setSnapshot(null); + leader.setSnapshotHolder(null); for (int i = 0; i < 4; i++) { actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1, @@ -757,27 +799,48 @@ public class LeaderTest extends AbstractLeaderTest { // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets // installed with a SendInstallSnapshot - leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true)); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false, + RaftVersions.CURRENT_VERSION)); assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing()); CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot(); - - assertTrue(cs.isInstallSnapshotInitiated()); assertEquals(3, cs.getLastAppliedIndex()); assertEquals(1, cs.getLastAppliedTerm()); assertEquals(4, cs.getLastIndex()); assertEquals(2, cs.getLastTerm()); - // if an initiate is started again when first is in progress, it should not initiate Capture - leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry)); + assertNotNull("Create snapshot procedure not invoked", installSnapshotStream.get()); + assertTrue("Install snapshot stream present", installSnapshotStream.get().isPresent()); + + MessageCollectorActor.clearMessages(followerActor); + // Sending Replicate message should not initiate another capture since the first is in progress. + leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true)); assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); + + // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture. + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false, + RaftVersions.CURRENT_VERSION)); + assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); + + // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent. + final byte[] bytes = new byte[]{1, 2, 3}; + installSnapshotStream.get().get().write(bytes); + actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(), + Runtime.getRuntime().totalMemory()); + MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); + + // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk. + MessageCollectorActor.clearMessages(followerActor); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false, + RaftVersions.CURRENT_VERSION)); + MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200); } @Test - public void testInstallSnapshot() throws Exception { + public void testInstallSnapshot() { logStart("testInstallSnapshot"); final MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -810,11 +873,12 @@ public class LeaderTest extends AbstractLeaderTest { leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); leader.getFollower(FOLLOWER_ID).setNextIndex(0); - Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(), - Collections.emptyList(), - lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm); + byte[] bytes = toByteString(leadersSnapshot).toByteArray(); + Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.emptyList(), + lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null); - RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot)); + RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, + new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes))); assertTrue(raftBehavior instanceof Leader); @@ -831,7 +895,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testForceInstallSnapshot() throws Exception { + public void testForceInstallSnapshot() { logStart("testForceInstallSnapshot"); final MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -861,11 +925,12 @@ public class LeaderTest extends AbstractLeaderTest { leader.getFollower(FOLLOWER_ID).setMatchIndex(-1); leader.getFollower(FOLLOWER_ID).setNextIndex(-1); - Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(), - Collections.emptyList(), - lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm); + byte[] bytes = toByteString(leadersSnapshot).toByteArray(); + Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.emptyList(), + lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null); - RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot)); + RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, + new SendInstallSnapshot(snapshot, ByteSource.wrap(bytes))); assertTrue(raftBehavior instanceof Leader); @@ -915,11 +980,12 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.emptyList(), - commitIndex, snapshotTerm, commitIndex, snapshotTerm)); + leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()), + Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, + -1, null, null), ByteSource.wrap(bs.toByteArray()))); LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState( actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName()); - fts.setSnapshotBytes(bs); + fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray())); leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts); while (!fts.isLastChunk(fts.getChunkIndex())) { fts.getNextChunk(); @@ -944,7 +1010,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testSendSnapshotfromInstallSnapshotReply() throws Exception { + public void testSendSnapshotfromInstallSnapshotReply() { logStart("testSendSnapshotfromInstallSnapshotReply"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -983,11 +1049,11 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); ByteString bs = toByteString(leadersSnapshot); - Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.emptyList(), - commitIndex, snapshotTerm, commitIndex, snapshotTerm); - leader.setSnapshot(snapshot); + Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()), + Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, + -1, null, null); - leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot)); + leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray()))); InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); @@ -1022,7 +1088,7 @@ public class LeaderTest extends AbstractLeaderTest { @Test - public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception { + public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() { logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -1057,12 +1123,12 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); ByteString bs = toByteString(leadersSnapshot); - Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.emptyList(), - commitIndex, snapshotTerm, commitIndex, snapshotTerm); - leader.setSnapshot(snapshot); + Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()), + Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, + -1, null, null); Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); - leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot)); + leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray()))); InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); @@ -1087,7 +1153,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception { + public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() { logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -1122,11 +1188,11 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); ByteString bs = toByteString(leadersSnapshot); - Snapshot snapshot = Snapshot.create(bs.toByteArray(), Collections.emptyList(), - commitIndex, snapshotTerm, commitIndex, snapshotTerm); - leader.setSnapshot(snapshot); + Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()), + Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, + -1, null, null); - leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot)); + leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray()))); InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); @@ -1151,7 +1217,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testLeaderInstallSnapshotState() { + public void testLeaderInstallSnapshotState() throws IOException { logStart("testLeaderInstallSnapshotState"); Map leadersSnapshot = new HashMap<>(); @@ -1163,7 +1229,7 @@ public class LeaderTest extends AbstractLeaderTest { byte[] barray = bs.toByteArray(); LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test"); - fts.setSnapshotBytes(bs); + fts.setSnapshotBytes(ByteSource.wrap(barray)); assertEquals(bs.size(), barray.length); @@ -1187,6 +1253,7 @@ public class LeaderTest extends AbstractLeaderTest { } assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks()); + fts.close(); } @Override @@ -1200,11 +1267,11 @@ public class LeaderTest extends AbstractLeaderTest { } @Override - protected MockRaftActorContext createActorContext(ActorRef actorRef) { + protected MockRaftActorContext createActorContext(final ActorRef actorRef) { return createActorContext(LEADER_ID, actorRef); } - private MockRaftActorContext createActorContext(String id, ActorRef actorRef) { + private MockRaftActorContext createActorContext(final String id, final ActorRef actorRef) { DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS)); configParams.setElectionTimeoutFactor(100000); @@ -1231,7 +1298,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception { + public void testLeaderCreatedWithCommitIndexLessThanLastIndex() { logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex"); final MockRaftActorContext leaderActorContext = createActorContextWithFollower(); @@ -1286,7 +1353,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception { + public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() { logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex"); final MockRaftActorContext leaderActorContext = createActorContext(); @@ -1656,7 +1723,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testHandleAppendEntriesReplySuccess() throws Exception { + public void testHandleAppendEntriesReplySuccess() { logStart("testHandleAppendEntriesReplySuccess"); MockRaftActorContext leaderActorContext = createActorContextWithFollower(); @@ -1855,7 +1922,7 @@ public class LeaderTest extends AbstractLeaderTest { assertTrue("Expected Leader", newBehavior instanceof Leader); } - private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy) { + private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(final RaftPolicy raftPolicy) { ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1"); ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2"); @@ -1876,7 +1943,7 @@ public class LeaderTest extends AbstractLeaderTest { assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader); // kill 1 follower and verify if that got killed - final JavaTestKit probe = new JavaTestKit(getSystem()); + final TestKit probe = new TestKit(getSystem()); probe.watch(followerActor1); followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender()); final Terminated termMsg1 = probe.expectMsgClass(Terminated.class); @@ -1900,7 +1967,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testIsolatedLeaderCheckTwoFollowers() throws Exception { + public void testIsolatedLeaderCheckTwoFollowers() { logStart("testIsolatedLeaderCheckTwoFollowers"); RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE); @@ -1910,7 +1977,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception { + public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() { logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled"); RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true)); @@ -1920,7 +1987,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testLaggingFollowerStarvation() throws Exception { + public void testLaggingFollowerStarvation() { logStart("testLaggingFollowerStarvation"); String leaderActorId = actorFactory.generateActorId("leader"); @@ -1989,8 +2056,8 @@ public class LeaderTest extends AbstractLeaderTest { leaderActorContext.setLastApplied(-1); String nonVotingFollowerId = "nonvoting-follower"; - TestActorRef nonVotingFollowerActor = actorFactory.createTestActor( - Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId)); + ActorRef nonVotingFollowerActor = actorFactory.createActor( + MessageCollectorActor.props(), actorFactory.generateActorId(nonVotingFollowerId)); leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING); @@ -2072,6 +2139,7 @@ public class LeaderTest extends AbstractLeaderTest { leader.transferLeadership(mockTransferCohort); verify(mockTransferCohort, never()).transferComplete(); + doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId(); MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0)); @@ -2102,6 +2170,7 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.clearMessages(followerActor); RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); + doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId(); leader.transferLeadership(mockTransferCohort); verify(mockTransferCohort, never()).transferComplete(); @@ -2133,6 +2202,7 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.clearMessages(followerActor); RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); + doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId(); leader.transferLeadership(mockTransferCohort); verify(mockTransferCohort, never()).transferComplete(); @@ -2194,9 +2264,201 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.assertNoneMatching(followerActor, ElectionTimeout.class, 100); } + @Test + public void testReplicationWithPayloadSizeThatExceedsThreshold() { + logStart("testReplicationWithPayloadSizeThatExceedsThreshold"); + + final int serializedSize = SerializationUtils.serialize(new AppendEntries(1, LEADER_ID, -1, -1, + Arrays.asList(new SimpleReplicatedLogEntry(0, 1, + new MockRaftActorContext.MockPayload("large"))), 0, -1, (short)0)).length; + final MockRaftActorContext.MockPayload largePayload = + new MockRaftActorContext.MockPayload("large", serializedSize); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(300, TimeUnit.MILLISECONDS)); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(serializedSize - 50); + leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + leaderActorContext.setCommitIndex(-1); + leaderActorContext.setLastApplied(-1); + + leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); + + // Send initial heartbeat reply so follower is marked active + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0)); + MessageCollectorActor.clearMessages(followerActor); + + // Send normal payload first to prime commit index. + final long term = leaderActorContext.getTermInformation().getCurrentTerm(); + sendReplicate(leaderActorContext, term, 0); + + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("Entries size", 1, appendEntries.getEntries().size()); + assertEquals("Entry getIndex", 0, appendEntries.getEntries().get(0).getIndex()); + + leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 0, term, (short)0)); + assertEquals("getCommitIndex", 0, leaderActorContext.getCommitIndex()); + MessageCollectorActor.clearMessages(followerActor); + + // Now send a large payload that exceeds the maximum size for a single AppendEntries - it should be sliced. + sendReplicate(leaderActorContext, term, 1, largePayload); + + MessageSlice messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class); + assertEquals("getSliceIndex", 1, messageSlice.getSliceIndex()); + assertEquals("getTotalSlices", 2, messageSlice.getTotalSlices()); + + final Identifier slicingId = messageSlice.getIdentifier(); + + appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex()); + assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm()); + assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit()); + assertEquals("Entries size", 0, appendEntries.getEntries().size()); + MessageCollectorActor.clearMessages(followerActor); + + // Initiate a heartbeat - it should send an empty AppendEntries since slicing is in progress. + + // Sleep for the heartbeat interval so AppendEntries is sent. + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams() + .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); + + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); + + appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit()); + assertEquals("Entries size", 0, appendEntries.getEntries().size()); + MessageCollectorActor.clearMessages(followerActor); + + // Simulate the MessageSliceReply's and AppendEntriesReply from the follower. + + leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 1, followerActor)); + messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class); + assertEquals("getSliceIndex", 2, messageSlice.getSliceIndex()); + + leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 2, followerActor)); + + leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 1, term, (short)0)); + + MessageCollectorActor.clearMessages(followerActor); + + // Send another normal payload. + + sendReplicate(leaderActorContext, term, 2); + + appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("Entries size", 1, appendEntries.getEntries().size()); + assertEquals("Entry getIndex", 2, appendEntries.getEntries().get(0).getIndex()); + assertEquals("getLeaderCommit", 1, appendEntries.getLeaderCommit()); + } + + @Test + public void testLargePayloadSlicingExpiration() { + logStart("testLargePayloadSlicingExpiration"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + new FiniteDuration(100, TimeUnit.MILLISECONDS)); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(10); + leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + leaderActorContext.setCommitIndex(-1); + leaderActorContext.setLastApplied(-1); + + final long term = leaderActorContext.getTermInformation().getCurrentTerm(); + leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); + + // Send initial heartbeat reply so follower is marked active + MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0)); + MessageCollectorActor.clearMessages(followerActor); + + sendReplicate(leaderActorContext, term, 0, new MockRaftActorContext.MockPayload("large", + leaderActorContext.getConfigParams().getSnapshotChunkSize() + 1)); + MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class); + + // Sleep for at least 3 * election timeout so the slicing state expires. + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams() + .getElectionTimeOutInterval().toMillis() * 3 + 50, TimeUnit.MILLISECONDS); + MessageCollectorActor.clearMessages(followerActor); + + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); + + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit()); + assertEquals("Entries size", 0, appendEntries.getEntries().size()); + + MessageCollectorActor.assertNoneMatching(followerActor, MessageSlice.class, 300); + MessageCollectorActor.clearMessages(followerActor); + + // Send an AppendEntriesReply - this should restart the slicing. + + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams() + .getHeartBeatInterval().toMillis() + 50, TimeUnit.MILLISECONDS); + + leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, -1, term, (short)0)); + + MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class); + } + + @Test + public void testLeaderAddressInAppendEntries() { + logStart("testLeaderAddressInAppendEntries"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + FiniteDuration.create(50, TimeUnit.MILLISECONDS)); + leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + leaderActorContext.setCommitIndex(-1); + leaderActorContext.setLastApplied(-1); + + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setPeerAddressResolver( + peerId -> leaderActor.path().toString()); + + leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); + + // Initial heartbeat shouldn't have the leader address + + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertFalse(appendEntries.getLeaderAddress().isPresent()); + MessageCollectorActor.clearMessages(followerActor); + + // Send AppendEntriesReply indicating the follower needs the leader address + + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, true, + RaftVersions.CURRENT_VERSION)); + + // Sleep for the heartbeat interval so AppendEntries is sent. + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams() + .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); + + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); + + appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertTrue(appendEntries.getLeaderAddress().isPresent()); + assertEquals(leaderActor.path().toString(), appendEntries.getLeaderAddress().get()); + MessageCollectorActor.clearMessages(followerActor); + + // Send AppendEntriesReply indicating the follower does not need the leader address + + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, false, + RaftVersions.CURRENT_VERSION)); + + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams() + .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); + + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); + + appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertFalse(appendEntries.getLeaderAddress().isPresent()); + } + @Override - protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext, - ActorRef actorRef, RaftRPC rpc) throws Exception { + protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext, + final ActorRef actorRef, final RaftRPC rpc) { super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc); assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor()); } @@ -2206,8 +2468,7 @@ public class LeaderTest extends AbstractLeaderTest { private final long electionTimeOutIntervalMillis; private final int snapshotChunkSize; - MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) { - super(); + MockConfigParamsImpl(final long electionTimeOutIntervalMillis, final int snapshotChunkSize) { this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis; this.snapshotChunkSize = snapshotChunkSize; }