X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FLeaderTest.java;h=d9a5487e556171813bf47d49af2a18a85b0b165c;hb=HEAD;hp=930c1968ace2465936ff2c42b37869f7742012b6;hpb=8d90cf04be86f872f7eeb892d37517d5ee087157;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 930c1968ac..dc84644d3b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -5,7 +5,6 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.raft.behaviors; import static org.junit.Assert.assertEquals; @@ -23,20 +22,19 @@ import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Terminated; -import akka.testkit.JavaTestKit; +import akka.protobuf.ByteString; import akka.testkit.TestActorRef; -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableMap; +import akka.testkit.javadsl.TestKit; import com.google.common.io.ByteSource; import com.google.common.util.concurrent.Uninterruptibles; -import com.google.protobuf.ByteString; import java.io.IOException; import java.io.OutputStream; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang3.SerializationUtils; @@ -65,6 +63,7 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot; import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply; +import org.opendaylight.controller.cluster.raft.messages.Payload; import org.opendaylight.controller.cluster.raft.messages.RaftRPC; import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; @@ -73,7 +72,6 @@ import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEnt import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.policy.DefaultRaftPolicy; import org.opendaylight.controller.cluster.raft.policy.RaftPolicy; -import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.controller.cluster.raft.utils.ForwardMessageToBehaviorActor; import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor; import org.opendaylight.yangtools.concepts.Identifier; @@ -95,7 +93,7 @@ public class LeaderTest extends AbstractLeaderTest { @Override @After - public void tearDown() throws Exception { + public void tearDown() { if (leader != null) { leader.close(); } @@ -104,7 +102,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testHandleMessageForUnknownMessage() throws Exception { + public void testHandleMessageForUnknownMessage() { logStart("testHandleMessageForUnknownMessage"); leader = new Leader(createActorContext()); @@ -114,7 +112,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() throws Exception { + public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() { logStart("testThatLeaderSendsAHeartbeatMessageToAllFollowers"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -153,28 +151,29 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex()); assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm()); assertEquals("Entries size", 1, appendEntries.getEntries().size()); - assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex()); - assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm()); + assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).index()); + assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).term()); assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion()); } - private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index) { + private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long index) { return sendReplicate(actorContext, 1, index); } - private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index) { + private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term, + final long index) { return sendReplicate(actorContext, term, index, new MockRaftActorContext.MockPayload("foo")); } - private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long term, long index, Payload payload) { - SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload); - actorContext.getReplicatedLog().append(newEntry); - return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true)); + private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term, final long index, + final Payload payload) { + actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(index, term, payload)); + return leader.handleMessage(leaderActor, new Replicate(index, true, null, null)); } @Test - public void testHandleReplicateMessageSendAppendEntriesToFollower() throws Exception { + public void testHandleReplicateMessageSendAppendEntriesToFollower() { logStart("testHandleReplicateMessageSendAppendEntriesToFollower"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -204,14 +203,14 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex()); assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm()); assertEquals("Entries size", 1, appendEntries.getEntries().size()); - assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex()); - assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm()); + assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).index()); + assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).term()); assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString()); assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex()); } @Test - public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() throws Exception { + public void testHandleReplicateMessageWithHigherTermThanPreviousEntry() { logStart("testHandleReplicateMessageWithHigherTermThanPreviousEntry"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -252,8 +251,8 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex()); assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm()); assertEquals("Entries size", 1, appendEntries.getEntries().size()); - assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex()); - assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm()); + assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).index()); + assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).term()); assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString()); // The follower replies with success. The leader should now update the commit index to the new index @@ -266,7 +265,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() throws Exception { + public void testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus() { logStart("testHandleReplicateMessageCommitIndexIncrementedBeforeConsensus"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -297,14 +296,14 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex()); assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm()); assertEquals("Entries size", 1, appendEntries.getEntries().size()); - assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex()); - assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm()); + assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).index()); + assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).term()); assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString()); assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex()); } @Test - public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() throws Exception { + public void testMultipleReplicateShouldNotCauseDuplicateAppendEntriesToBeSent() { logStart("testHandleReplicateMessageSendAppendEntriesToFollower"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -344,7 +343,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testMultipleReplicateWithReplyShouldResultInAppendEntries() throws Exception { + public void testMultipleReplicateWithReplyShouldResultInAppendEntries() { logStart("testMultipleReplicateWithReplyShouldResultInAppendEntries"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -375,26 +374,47 @@ public class LeaderTest extends AbstractLeaderTest { sendReplicate(actorContext, lastIndex + i + 1); leader.handleMessage(followerActor, new AppendEntriesReply( FOLLOWER_ID, term, true, lastIndex + i + 1, term, (short)0)); - } - for (int i = 3; i < 5; i++) { - sendReplicate(actorContext, lastIndex + i + 1); + // We are expecting six messages here -- a request to replicate and a consensus-reached message + List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + assertEquals("The number of request/consensus appends collected", 6, allMessages.size()); + for (int i = 0; i < 3; i++) { + assertRequestEntry(lastIndex, allMessages, i); + assertCommitEntry(lastIndex, allMessages, i); } - List allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); - // We expect 4 here because the first 3 replicate got a reply and so the 4th entry would - // get sent to the follower - but not the 5th - assertEquals("The number of append entries collected should be 4", 4, allMessages.size()); + // Now perform another commit, eliciting a request to persist + sendReplicate(actorContext, lastIndex + 3 + 1); + allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + // This elicits another message for request to replicate + assertEquals("The number of request entries collected", 7, allMessages.size()); + assertRequestEntry(lastIndex, allMessages, 3); - for (int i = 0; i < 4; i++) { - long expected = allMessages.get(i).getEntries().get(0).getIndex(); - assertEquals(expected, i + 2); - } + sendReplicate(actorContext, lastIndex + 4 + 1); + allMessages = MessageCollectorActor.getAllMatching(followerActor, AppendEntries.class); + assertEquals("The number of request entries collected", 7, allMessages.size()); + } + + private static void assertCommitEntry(final long lastIndex, final List allMessages, + final int messageNr) { + final AppendEntries commitReq = allMessages.get(2 * messageNr + 1); + assertEquals(lastIndex + messageNr + 1, commitReq.getLeaderCommit()); + assertEquals(List.of(), commitReq.getEntries()); + } + + private static void assertRequestEntry(final long lastIndex, final List allMessages, + final int messageNr) { + final AppendEntries req = allMessages.get(2 * messageNr); + assertEquals(lastIndex + messageNr, req.getLeaderCommit()); + + final List entries = req.getEntries(); + assertEquals(1, entries.size()); + assertEquals(messageNr + 2, entries.get(0).index()); } @Test - public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() throws Exception { + public void testDuplicateAppendEntriesWillBeSentOnHeartBeat() { logStart("testDuplicateAppendEntriesWillBeSentOnHeartBeat"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -432,14 +452,15 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("The number of append entries collected should be 2", 2, allMessages.size()); assertEquals(1, allMessages.get(0).getEntries().size()); - assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex()); + assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).index()); assertEquals(1, allMessages.get(1).getEntries().size()); - assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex()); + // FIXME: weird assert + assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).index()); } @Test - public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() throws Exception { + public void testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed() { logStart("testHeartbeatsAreAlwaysSentIfTheHeartbeatIntervalHasElapsed"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -476,7 +497,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() throws Exception { + public void testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate() { logStart("testSendingReplicateImmediatelyAfterHeartbeatDoesReplicate"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -516,7 +537,7 @@ public class LeaderTest extends AbstractLeaderTest { @Test - public void testHandleReplicateMessageWhenThereAreNoFollowers() throws Exception { + public void testHandleReplicateMessageWhenThereAreNoFollowers() { logStart("testHandleReplicateMessageWhenThereAreNoFollowers"); MockRaftActorContext actorContext = createActorContext(); @@ -525,16 +546,14 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.setLastApplied(0); - long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1; - long term = actorContext.getTermInformation().getCurrentTerm(); - ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry( - newLogIndex, term, new MockRaftActorContext.MockPayload("foo")); + final long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1; + final long term = actorContext.getTermInformation().getCurrentTerm(); + final var data = new MockRaftActorContext.MockPayload("foo"); - actorContext.getReplicatedLog().append(newEntry); + actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(newLogIndex, term, data)); final Identifier id = new MockIdentifier("state-id"); - RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, - new Replicate(leaderActor, id, newEntry, true)); + final var raftBehavior = leader.handleMessage(leaderActor, new Replicate(newLogIndex, true, leaderActor, id)); // State should not change assertTrue(raftBehavior instanceof Leader); @@ -543,18 +562,17 @@ public class LeaderTest extends AbstractLeaderTest { // We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous // one since lastApplied state is 0. - List applyStateList = MessageCollectorActor.getAllMatching( - leaderActor, ApplyState.class); + final var applyStateList = MessageCollectorActor.getAllMatching(leaderActor, ApplyState.class); assertEquals("ApplyState count", newLogIndex, applyStateList.size()); for (int i = 0; i <= newLogIndex - 1; i++) { ApplyState applyState = applyStateList.get(i); - assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex()); - assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm()); + assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().index()); + assertEquals("getTerm", term, applyState.getReplicatedLogEntry().term()); } ApplyState last = applyStateList.get((int) newLogIndex - 1); - assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData()); + assertEquals("getData", data, last.getReplicatedLogEntry().getData()); assertEquals("getIdentifier", id, last.getIdentifier()); } @@ -564,11 +582,6 @@ public class LeaderTest extends AbstractLeaderTest { final MockRaftActorContext actorContext = createActorContextWithFollower(); - Map leadersSnapshot = new HashMap<>(); - leadersSnapshot.put("1", "A"); - leadersSnapshot.put("2", "B"); - leadersSnapshot.put("3", "C"); - //clears leaders log actorContext.getReplicatedLog().removeFrom(0); @@ -591,12 +604,12 @@ public class LeaderTest extends AbstractLeaderTest { //update follower timestamp leader.markFollowerActive(FOLLOWER_ID); - ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()), - Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, + ByteString bs = toByteString(Map.of("1", "A", "2", "B", "3", "C")); + leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()), + List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null), ByteSource.wrap(bs.toByteArray()))); LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState( - actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName()); + actorContext.getConfigParams().getMaximumMessageSliceSize(), leader.logName()); fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray())); leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts); @@ -624,7 +637,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testSendAppendEntriesSnapshotScenario() throws Exception { + public void testSendAppendEntriesSnapshotScenario() { logStart("testSendAppendEntriesSnapshotScenario"); final MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -654,18 +667,15 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); // new entry - SimpleReplicatedLogEntry entry = - new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, - new MockRaftActorContext.MockPayload("D")); - - actorContext.getReplicatedLog().append(entry); + actorContext.getReplicatedLog().append( + new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D"))); //update follower timestamp leader.markFollowerActive(FOLLOWER_ID); // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex RaftActorBehavior raftBehavior = leader.handleMessage( - leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true)); + leaderActor, new Replicate(newEntryIndex, true, null, new MockIdentifier("state-id"))); assertTrue(raftBehavior instanceof Leader); @@ -673,7 +683,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testInitiateInstallSnapshot() throws Exception { + public void testInitiateInstallSnapshot() { logStart("testInitiateInstallSnapshot"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -699,18 +709,16 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); // set the snapshot as absent and check if capture-snapshot is invoked. - leader.setSnapshot(null); + leader.setSnapshotHolder(null); // new entry - SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, - new MockRaftActorContext.MockPayload("D")); - - actorContext.getReplicatedLog().append(entry); + actorContext.getReplicatedLog().append( + new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D"))); //update follower timestamp leader.markFollowerActive(FOLLOWER_ID); - leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true)); + leader.handleMessage(leaderActor, new Replicate(newEntryIndex, true, null, new MockIdentifier("state-id"))); assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing()); @@ -722,7 +730,7 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals(2, cs.getLastTerm()); // if an initiate is started again when first is in progress, it shouldnt initiate Capture - leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true)); + leader.handleMessage(leaderActor, new Replicate(newEntryIndex, true, null, new MockIdentifier("state-id"))); assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); } @@ -747,7 +755,7 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getReplicatedLog().removeFrom(0); - AtomicReference> installSnapshotStream = new AtomicReference<>(); + AtomicReference> installSnapshotStream = new AtomicReference<>(); actorContext.setCreateSnapshotProcedure(installSnapshotStream::set); leader = new Leader(actorContext); @@ -757,7 +765,7 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); // set the snapshot as absent and check if capture-snapshot is invoked. - leader.setSnapshot(null); + leader.setSnapshotHolder(null); for (int i = 0; i < 4; i++) { actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(i, 1, @@ -765,17 +773,16 @@ public class LeaderTest extends AbstractLeaderTest { } // new entry - SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, - new MockRaftActorContext.MockPayload("D")); - - actorContext.getReplicatedLog().append(entry); + actorContext.getReplicatedLog().append( + new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D"))); //update follower timestamp leader.markFollowerActive(FOLLOWER_ID); // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets // installed with a SendInstallSnapshot - leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true)); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false, + RaftVersions.CURRENT_VERSION)); assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing()); @@ -791,29 +798,31 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.clearMessages(followerActor); // Sending Replicate message should not initiate another capture since the first is in progress. - leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true)); + leader.handleMessage(leaderActor, new Replicate(newEntryIndex, true, null, new MockIdentifier("state-id"))); assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture. - leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true)); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false, + RaftVersions.CURRENT_VERSION)); assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent. final byte[] bytes = new byte[]{1, 2, 3}; - installSnapshotStream.get().get().write(bytes); + installSnapshotStream.get().orElseThrow().write(bytes); actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(), Runtime.getRuntime().totalMemory()); MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk. MessageCollectorActor.clearMessages(followerActor); - leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true)); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true, false, + RaftVersions.CURRENT_VERSION)); MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200); } @Test - public void testInstallSnapshot() throws Exception { + public void testInstallSnapshot() { logStart("testInstallSnapshot"); final MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -847,7 +856,7 @@ public class LeaderTest extends AbstractLeaderTest { leader.getFollower(FOLLOWER_ID).setNextIndex(0); byte[] bytes = toByteString(leadersSnapshot).toByteArray(); - Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.emptyList(), + Snapshot snapshot = Snapshot.create(ByteState.of(bytes), List.of(), lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null); RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, @@ -868,7 +877,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testForceInstallSnapshot() throws Exception { + public void testForceInstallSnapshot() { logStart("testForceInstallSnapshot"); final MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -899,7 +908,7 @@ public class LeaderTest extends AbstractLeaderTest { leader.getFollower(FOLLOWER_ID).setNextIndex(-1); byte[] bytes = toByteString(leadersSnapshot).toByteArray(); - Snapshot snapshot = Snapshot.create(ByteState.of(bytes), Collections.emptyList(), + Snapshot snapshot = Snapshot.create(ByteState.of(bytes), List.of(), lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm, -1, null, null); RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, @@ -953,11 +962,11 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString()); ByteString bs = toByteString(leadersSnapshot); - leader.setSnapshot(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()), - Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, + leader.setSnapshotHolder(new SnapshotHolder(Snapshot.create(ByteState.of(bs.toByteArray()), + List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null), ByteSource.wrap(bs.toByteArray()))); LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState( - actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName()); + actorContext.getConfigParams().getMaximumMessageSliceSize(), leader.logName()); fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray())); leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts); while (!fts.isLastChunk(fts.getChunkIndex())) { @@ -983,7 +992,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testSendSnapshotfromInstallSnapshotReply() throws Exception { + public void testSendSnapshotfromInstallSnapshotReply() { logStart("testSendSnapshotfromInstallSnapshotReply"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -995,7 +1004,7 @@ public class LeaderTest extends AbstractLeaderTest { DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() { @Override - public int getSnapshotChunkSize() { + public int getMaximumMessageSliceSize() { return 50; } }; @@ -1023,8 +1032,7 @@ public class LeaderTest extends AbstractLeaderTest { ByteString bs = toByteString(leadersSnapshot); Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()), - Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, - -1, null, null); + List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null); leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray()))); @@ -1061,7 +1069,7 @@ public class LeaderTest extends AbstractLeaderTest { @Test - public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception { + public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() { logStart("testHandleInstallSnapshotReplyWithInvalidChunkIndex"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -1073,7 +1081,7 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.setConfigParams(new DefaultConfigParamsImpl() { @Override - public int getSnapshotChunkSize() { + public int getMaximumMessageSliceSize() { return 50; } }); @@ -1097,8 +1105,7 @@ public class LeaderTest extends AbstractLeaderTest { ByteString bs = toByteString(leadersSnapshot); Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()), - Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, - -1, null, null); + List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null); Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray()))); @@ -1126,7 +1133,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() throws Exception { + public void testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk() { logStart("testHandleSnapshotSendsPreviousChunksHashCodeWhenSendingNextChunk"); MockRaftActorContext actorContext = createActorContextWithFollower(); @@ -1138,7 +1145,7 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.setConfigParams(new DefaultConfigParamsImpl() { @Override - public int getSnapshotChunkSize() { + public int getMaximumMessageSliceSize() { return 50; } }); @@ -1162,8 +1169,7 @@ public class LeaderTest extends AbstractLeaderTest { ByteString bs = toByteString(leadersSnapshot); Snapshot snapshot = Snapshot.create(ByteState.of(bs.toByteArray()), - Collections.emptyList(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, - -1, null, null); + List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm, -1, null, null); leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot, ByteSource.wrap(bs.toByteArray()))); @@ -1172,8 +1178,8 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals(1, installSnapshot.getChunkIndex()); assertEquals(3, installSnapshot.getTotalChunks()); - assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE, - installSnapshot.getLastChunkHashCode().get().intValue()); + assertEquals(OptionalInt.of(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE), + installSnapshot.getLastChunkHashCode()); final int hashCode = Arrays.hashCode(installSnapshot.getData()); @@ -1186,7 +1192,7 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals(2, installSnapshot.getChunkIndex()); assertEquals(3, installSnapshot.getTotalChunks()); - assertEquals(hashCode, installSnapshot.getLastChunkHashCode().get().intValue()); + assertEquals(OptionalInt.of(hashCode), installSnapshot.getLastChunkHashCode()); } @Test @@ -1240,11 +1246,11 @@ public class LeaderTest extends AbstractLeaderTest { } @Override - protected MockRaftActorContext createActorContext(ActorRef actorRef) { + protected MockRaftActorContext createActorContext(final ActorRef actorRef) { return createActorContext(LEADER_ID, actorRef); } - private MockRaftActorContext createActorContext(String id, ActorRef actorRef) { + private MockRaftActorContext createActorContext(final String id, final ActorRef actorRef) { DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS)); configParams.setElectionTimeoutFactor(100000); @@ -1256,8 +1262,7 @@ public class LeaderTest extends AbstractLeaderTest { private MockRaftActorContext createActorContextWithFollower() { MockRaftActorContext actorContext = createActorContext(); - actorContext.setPeerAddresses(ImmutableMap.builder().put(FOLLOWER_ID, - followerActor.path().toString()).build()); + actorContext.setPeerAddresses(Map.of(FOLLOWER_ID, followerActor.path().toString())); return actorContext; } @@ -1266,12 +1271,12 @@ public class LeaderTest extends AbstractLeaderTest { DefaultConfigParamsImpl followerConfig = new DefaultConfigParamsImpl(); followerConfig.setElectionTimeoutFactor(10000); followerActorContext.setConfigParams(followerConfig); - followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString())); + followerActorContext.setPeerAddresses(Map.of(LEADER_ID, leaderActor.path().toString())); return followerActorContext; } @Test - public void testLeaderCreatedWithCommitIndexLessThanLastIndex() throws Exception { + public void testLeaderCreatedWithCommitIndexLessThanLastIndex() { logStart("testLeaderCreatedWithCommitIndexLessThanLastIndex"); final MockRaftActorContext leaderActorContext = createActorContextWithFollower(); @@ -1326,13 +1331,13 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() throws Exception { + public void testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex() { logStart("testLeaderCreatedWithCommitIndexLessThanFollowersCommitIndex"); final MockRaftActorContext leaderActorContext = createActorContext(); MockRaftActorContext followerActorContext = createActorContext(FOLLOWER_ID, followerActor); - followerActorContext.setPeerAddresses(ImmutableMap.of(LEADER_ID, leaderActor.path().toString())); + followerActorContext.setPeerAddresses(Map.of(LEADER_ID, leaderActor.path().toString())); Follower follower = new Follower(followerActorContext); followerActor.underlyingActor().setBehavior(follower); @@ -1452,10 +1457,10 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex()); assertEquals("Log entries size", 2, appendEntries.getEntries().size()); - assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex()); + assertEquals("First entry index", 1, appendEntries.getEntries().get(0).index()); assertEquals("First entry data", leadersSecondLogEntry.getData(), appendEntries.getEntries().get(0).getData()); - assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex()); + assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).index()); assertEquals("Second entry data", leadersThirdLogEntry.getData(), appendEntries.getEntries().get(1).getData()); @@ -1465,14 +1470,14 @@ public class LeaderTest extends AbstractLeaderTest { List applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2); ApplyState applyState = applyStateList.get(0); - assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex()); - assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm()); + assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().index()); + assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().term()); assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(), applyState.getReplicatedLogEntry().getData()); applyState = applyStateList.get(1); - assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex()); - assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm()); + assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().index()); + assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().term()); assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(), applyState.getReplicatedLogEntry().getData()); @@ -1533,10 +1538,10 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); assertEquals("Log entries size", 2, appendEntries.getEntries().size()); - assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex()); + assertEquals("First entry index", 0, appendEntries.getEntries().get(0).index()); assertEquals("First entry data", leadersFirstLogEntry.getData(), appendEntries.getEntries().get(0).getData()); - assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex()); + assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).index()); assertEquals("Second entry data", leadersSecondLogEntry.getData(), appendEntries.getEntries().get(1).getData()); @@ -1546,14 +1551,14 @@ public class LeaderTest extends AbstractLeaderTest { List applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2); ApplyState applyState = applyStateList.get(0); - assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex()); - assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm()); + assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().index()); + assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().term()); assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(), applyState.getReplicatedLogEntry().getData()); applyState = applyStateList.get(1); - assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex()); - assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm()); + assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().index()); + assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().term()); assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(), applyState.getReplicatedLogEntry().getData()); @@ -1615,12 +1620,12 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); assertEquals("Log entries size", 2, appendEntries.getEntries().size()); - assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex()); - assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm()); + assertEquals("First entry index", 0, appendEntries.getEntries().get(0).index()); + assertEquals("First entry term", 2, appendEntries.getEntries().get(0).term()); assertEquals("First entry data", leadersFirstLogEntry.getData(), appendEntries.getEntries().get(0).getData()); - assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex()); - assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm()); + assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).index()); + assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).term()); assertEquals("Second entry data", leadersSecondLogEntry.getData(), appendEntries.getEntries().get(1).getData()); @@ -1630,14 +1635,14 @@ public class LeaderTest extends AbstractLeaderTest { List applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2); ApplyState applyState = applyStateList.get(0); - assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex()); - assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm()); + assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().index()); + assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().term()); assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(), applyState.getReplicatedLogEntry().getData()); applyState = applyStateList.get(1); - assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex()); - assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm()); + assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().index()); + assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().term()); assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(), applyState.getReplicatedLogEntry().getData()); @@ -1696,7 +1701,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testHandleAppendEntriesReplySuccess() throws Exception { + public void testHandleAppendEntriesReplySuccess() { logStart("testHandleAppendEntriesReplySuccess"); MockRaftActorContext leaderActorContext = createActorContextWithFollower(); @@ -1713,7 +1718,7 @@ public class LeaderTest extends AbstractLeaderTest { FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID); assertEquals(payloadVersion, leader.getLeaderPayloadVersion()); - assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion()); + assertEquals(RaftVersions.FLUORINE_VERSION, followerInfo.getRaftVersion()); AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion); @@ -1737,7 +1742,7 @@ public class LeaderTest extends AbstractLeaderTest { ApplyState applyState = applyStateList.get(0); - assertEquals(2, applyState.getReplicatedLogEntry().getIndex()); + assertEquals(2, applyState.getReplicatedLogEntry().index()); assertEquals(2, followerInfo.getMatchIndex()); assertEquals(3, followerInfo.getNextIndex()); @@ -1767,7 +1772,8 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext leaderActorContext = createActorContextWithFollower(); ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( new FiniteDuration(1000, TimeUnit.SECONDS)); - ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(2); + // Note: the size here depends on estimate + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setMaximumMessageSliceSize(246); leaderActorContext.setReplicatedLog( new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build()); @@ -1818,10 +1824,10 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex()); assertEquals("Log entries size", 2, appendEntries.getEntries().size()); - assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex()); + assertEquals("First entry index", 0, appendEntries.getEntries().get(0).index()); assertEquals("First entry data", leadersFirstLogEntry.getData(), appendEntries.getEntries().get(0).getData()); - assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex()); + assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).index()); assertEquals("Second entry data", leadersSecondLogEntry.getData(), appendEntries.getEntries().get(1).getData()); @@ -1830,10 +1836,10 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex()); assertEquals("Log entries size", 2, appendEntries.getEntries().size()); - assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex()); + assertEquals("First entry index", 2, appendEntries.getEntries().get(0).index()); assertEquals("First entry data", leadersThirdLogEntry.getData(), appendEntries.getEntries().get(0).getData()); - assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex()); + assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).index()); assertEquals("Second entry data", leadersFourthLogEntry.getData(), appendEntries.getEntries().get(1).getData()); @@ -1895,7 +1901,7 @@ public class LeaderTest extends AbstractLeaderTest { assertTrue("Expected Leader", newBehavior instanceof Leader); } - private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy) { + private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(final RaftPolicy raftPolicy) { ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1"); ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2"); @@ -1916,7 +1922,7 @@ public class LeaderTest extends AbstractLeaderTest { assertTrue("Behavior not instance of Leader when all followers are active", newBehavior instanceof Leader); // kill 1 follower and verify if that got killed - final JavaTestKit probe = new JavaTestKit(getSystem()); + final TestKit probe = new TestKit(getSystem()); probe.watch(followerActor1); followerActor1.tell(PoisonPill.getInstance(), ActorRef.noSender()); final Terminated termMsg1 = probe.expectMsgClass(Terminated.class); @@ -1940,7 +1946,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testIsolatedLeaderCheckTwoFollowers() throws Exception { + public void testIsolatedLeaderCheckTwoFollowers() { logStart("testIsolatedLeaderCheckTwoFollowers"); RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(DefaultRaftPolicy.INSTANCE); @@ -1950,7 +1956,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() throws Exception { + public void testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled() { logStart("testIsolatedLeaderCheckTwoFollowersWhenElectionsAreDisabled"); RaftActorBehavior newBehavior = setupIsolatedLeaderCheckTestWithTwoFollowers(createRaftPolicy(false, true)); @@ -1960,7 +1966,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testLaggingFollowerStarvation() throws Exception { + public void testLaggingFollowerStarvation() { logStart("testLaggingFollowerStarvation"); String leaderActorId = actorFactory.generateActorId("leader"); @@ -2029,8 +2035,8 @@ public class LeaderTest extends AbstractLeaderTest { leaderActorContext.setLastApplied(-1); String nonVotingFollowerId = "nonvoting-follower"; - TestActorRef nonVotingFollowerActor = actorFactory.createTestActor( - Props.create(MessageCollectorActor.class), actorFactory.generateActorId(nonVotingFollowerId)); + ActorRef nonVotingFollowerActor = actorFactory.createActor( + MessageCollectorActor.props(), actorFactory.generateActorId(nonVotingFollowerId)); leaderActorContext.addToPeers(nonVotingFollowerId, nonVotingFollowerActor.path().toString(), VotingState.NON_VOTING); @@ -2070,7 +2076,7 @@ public class LeaderTest extends AbstractLeaderTest { AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor, AppendEntries.class); assertEquals("Log entries size", 1, appendEntries.getEntries().size()); - assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex()); + assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).index()); // Send reply only from the non-voting follower and verify no consensus via no ApplyState. leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0)); @@ -2112,7 +2118,7 @@ public class LeaderTest extends AbstractLeaderTest { leader.transferLeadership(mockTransferCohort); verify(mockTransferCohort, never()).transferComplete(); - doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId(); + doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId(); MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0)); @@ -2143,7 +2149,7 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.clearMessages(followerActor); RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); - doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId(); + doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId(); leader.transferLeadership(mockTransferCohort); verify(mockTransferCohort, never()).transferComplete(); @@ -2175,7 +2181,7 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.clearMessages(followerActor); RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class); - doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId(); + doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId(); leader.transferLeadership(mockTransferCohort); verify(mockTransferCohort, never()).transferComplete(); @@ -2242,7 +2248,7 @@ public class LeaderTest extends AbstractLeaderTest { logStart("testReplicationWithPayloadSizeThatExceedsThreshold"); final int serializedSize = SerializationUtils.serialize(new AppendEntries(1, LEADER_ID, -1, -1, - Arrays.asList(new SimpleReplicatedLogEntry(0, 1, + List.of(new SimpleReplicatedLogEntry(0, 1, new MockRaftActorContext.MockPayload("large"))), 0, -1, (short)0)).length; final MockRaftActorContext.MockPayload largePayload = new MockRaftActorContext.MockPayload("large", serializedSize); @@ -2250,7 +2256,7 @@ public class LeaderTest extends AbstractLeaderTest { MockRaftActorContext leaderActorContext = createActorContextWithFollower(); ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( new FiniteDuration(300, TimeUnit.MILLISECONDS)); - ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(serializedSize - 50); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setMaximumMessageSliceSize(serializedSize - 50); leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); leaderActorContext.setCommitIndex(-1); leaderActorContext.setLastApplied(-1); @@ -2269,7 +2275,7 @@ public class LeaderTest extends AbstractLeaderTest { AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); assertEquals("Entries size", 1, appendEntries.getEntries().size()); - assertEquals("Entry getIndex", 0, appendEntries.getEntries().get(0).getIndex()); + assertEquals("Entry getIndex", 0, appendEntries.getEntries().get(0).index()); leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 0, term, (short)0)); assertEquals("getCommitIndex", 0, leaderActorContext.getCommitIndex()); @@ -2322,7 +2328,7 @@ public class LeaderTest extends AbstractLeaderTest { appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); assertEquals("Entries size", 1, appendEntries.getEntries().size()); - assertEquals("Entry getIndex", 2, appendEntries.getEntries().get(0).getIndex()); + assertEquals("Entry getIndex", 2, appendEntries.getEntries().get(0).index()); assertEquals("getLeaderCommit", 1, appendEntries.getLeaderCommit()); } @@ -2334,7 +2340,7 @@ public class LeaderTest extends AbstractLeaderTest { ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( new FiniteDuration(100, TimeUnit.MILLISECONDS)); ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1); - ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(10); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setMaximumMessageSliceSize(10); leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); leaderActorContext.setCommitIndex(-1); leaderActorContext.setLastApplied(-1); @@ -2349,7 +2355,7 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.clearMessages(followerActor); sendReplicate(leaderActorContext, term, 0, new MockRaftActorContext.MockPayload("large", - leaderActorContext.getConfigParams().getSnapshotChunkSize() + 1)); + leaderActorContext.getConfigParams().getMaximumMessageSliceSize() + 1)); MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class); // Sleep for at least 3 * election timeout so the slicing state expires. @@ -2376,22 +2382,73 @@ public class LeaderTest extends AbstractLeaderTest { MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class); } + @Test + public void testLeaderAddressInAppendEntries() { + logStart("testLeaderAddressInAppendEntries"); + + MockRaftActorContext leaderActorContext = createActorContextWithFollower(); + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval( + FiniteDuration.create(50, TimeUnit.MILLISECONDS)); + leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build()); + leaderActorContext.setCommitIndex(-1); + leaderActorContext.setLastApplied(-1); + + ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setPeerAddressResolver( + peerId -> leaderActor.path().toString()); + + leader = new Leader(leaderActorContext); + leaderActorContext.setCurrentBehavior(leader); + + // Initial heartbeat shouldn't have the leader address + + AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertNull(appendEntries.leaderAddress()); + MessageCollectorActor.clearMessages(followerActor); + + // Send AppendEntriesReply indicating the follower needs the leader address + + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, true, + RaftVersions.CURRENT_VERSION)); + + // Sleep for the heartbeat interval so AppendEntries is sent. + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams() + .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); + + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); + + appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertEquals(leaderActor.path().toString(), appendEntries.leaderAddress()); + MessageCollectorActor.clearMessages(followerActor); + + // Send AppendEntriesReply indicating the follower does not need the leader address + + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, false, + RaftVersions.CURRENT_VERSION)); + + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams() + .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS); + + leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE); + + appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class); + assertNull(appendEntries.leaderAddress()); + } + @Override - protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext, - ActorRef actorRef, RaftRPC rpc) throws Exception { + protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext, + final ActorRef actorRef, final RaftRPC rpc) { super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc); assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor()); } - private class MockConfigParamsImpl extends DefaultConfigParamsImpl { + private static class MockConfigParamsImpl extends DefaultConfigParamsImpl { private final long electionTimeOutIntervalMillis; - private final int snapshotChunkSize; + private final int maximumMessageSliceSize; - MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) { - super(); + MockConfigParamsImpl(final long electionTimeOutIntervalMillis, final int maximumMessageSliceSize) { this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis; - this.snapshotChunkSize = snapshotChunkSize; + this.maximumMessageSliceSize = maximumMessageSliceSize; } @Override @@ -2400,8 +2457,8 @@ public class LeaderTest extends AbstractLeaderTest { } @Override - public int getSnapshotChunkSize() { - return snapshotChunkSize; + public int getMaximumMessageSliceSize() { + return maximumMessageSliceSize; } } }