+ @Test
+ public void testReplicationWithPayloadSizeThatExceedsThreshold() {
+ logStart("testReplicationWithPayloadSizeThatExceedsThreshold");
+
+ final int serializedSize = SerializationUtils.serialize(new AppendEntries(1, LEADER_ID, -1, -1,
+ List.of(new SimpleReplicatedLogEntry(0, 1,
+ new MockRaftActorContext.MockPayload("large"))), 0, -1, (short)0)).length;
+ final MockRaftActorContext.MockPayload largePayload =
+ new MockRaftActorContext.MockPayload("large", serializedSize);
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(300, TimeUnit.MILLISECONDS));
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(serializedSize - 50);
+ leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+ leaderActorContext.setCommitIndex(-1);
+ leaderActorContext.setLastApplied(-1);
+
+ leader = new Leader(leaderActorContext);
+ leaderActorContext.setCurrentBehavior(leader);
+
+ // Send initial heartbeat reply so follower is marked active
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Send normal payload first to prime commit index.
+ final long term = leaderActorContext.getTermInformation().getCurrentTerm();
+ sendReplicate(leaderActorContext, term, 0);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("Entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Entry getIndex", 0, appendEntries.getEntries().get(0).getIndex());
+
+ leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 0, term, (short)0));
+ assertEquals("getCommitIndex", 0, leaderActorContext.getCommitIndex());
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Now send a large payload that exceeds the maximum size for a single AppendEntries - it should be sliced.
+ sendReplicate(leaderActorContext, term, 1, largePayload);
+
+ MessageSlice messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
+ assertEquals("getSliceIndex", 1, messageSlice.getSliceIndex());
+ assertEquals("getTotalSlices", 2, messageSlice.getTotalSlices());
+
+ final Identifier slicingId = messageSlice.getIdentifier();
+
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
+ assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
+ assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
+ assertEquals("Entries size", 0, appendEntries.getEntries().size());
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Initiate a heartbeat - it should send an empty AppendEntries since slicing is in progress.
+
+ // Sleep for the heartbeat interval so AppendEntries is sent.
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+ .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
+ assertEquals("Entries size", 0, appendEntries.getEntries().size());
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Simulate the MessageSliceReply's and AppendEntriesReply from the follower.
+
+ leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 1, followerActor));
+ messageSlice = MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
+ assertEquals("getSliceIndex", 2, messageSlice.getSliceIndex());
+
+ leader.handleMessage(followerActor, MessageSliceReply.success(slicingId, 2, followerActor));
+
+ leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 1, term, (short)0));
+
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Send another normal payload.
+
+ sendReplicate(leaderActorContext, term, 2);
+
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("Entries size", 1, appendEntries.getEntries().size());
+ assertEquals("Entry getIndex", 2, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("getLeaderCommit", 1, appendEntries.getLeaderCommit());
+ }
+
+ @Test
+ public void testLargePayloadSlicingExpiration() {
+ logStart("testLargePayloadSlicingExpiration");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ new FiniteDuration(100, TimeUnit.MILLISECONDS));
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(10);
+ leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+ leaderActorContext.setCommitIndex(-1);
+ leaderActorContext.setLastApplied(-1);
+
+ final long term = leaderActorContext.getTermInformation().getCurrentTerm();
+ leader = new Leader(leaderActorContext);
+ leaderActorContext.setCurrentBehavior(leader);
+
+ // Send initial heartbeat reply so follower is marked active
+ MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, -1, true, -1, -1, (short)0));
+ MessageCollectorActor.clearMessages(followerActor);
+
+ sendReplicate(leaderActorContext, term, 0, new MockRaftActorContext.MockPayload("large",
+ leaderActorContext.getConfigParams().getSnapshotChunkSize() + 1));
+ MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
+
+ // Sleep for at least 3 * election timeout so the slicing state expires.
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+ .getElectionTimeOutInterval().toMillis() * 3 + 50, TimeUnit.MILLISECONDS);
+ MessageCollectorActor.clearMessages(followerActor);
+
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
+ assertEquals("Entries size", 0, appendEntries.getEntries().size());
+
+ MessageCollectorActor.assertNoneMatching(followerActor, MessageSlice.class, 300);
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Send an AppendEntriesReply - this should restart the slicing.
+
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+ .getHeartBeatInterval().toMillis() + 50, TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, -1, term, (short)0));
+
+ MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
+ }
+
+ @Test
+ public void testLeaderAddressInAppendEntries() {
+ logStart("testLeaderAddressInAppendEntries");
+
+ MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
+ FiniteDuration.create(50, TimeUnit.MILLISECONDS));
+ leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
+ leaderActorContext.setCommitIndex(-1);
+ leaderActorContext.setLastApplied(-1);
+
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setPeerAddressResolver(
+ peerId -> leaderActor.path().toString());
+
+ leader = new Leader(leaderActorContext);
+ leaderActorContext.setCurrentBehavior(leader);
+
+ // Initial heartbeat shouldn't have the leader address
+
+ AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertFalse(appendEntries.getLeaderAddress().isPresent());
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Send AppendEntriesReply indicating the follower needs the leader address
+
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, true,
+ RaftVersions.CURRENT_VERSION));
+
+ // Sleep for the heartbeat interval so AppendEntries is sent.
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+ .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertTrue(appendEntries.getLeaderAddress().isPresent());
+ assertEquals(leaderActor.path().toString(), appendEntries.getLeaderAddress().orElseThrow());
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Send AppendEntriesReply indicating the follower does not need the leader address
+
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, -1, -1, (short)0, false, false,
+ RaftVersions.CURRENT_VERSION));
+
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams()
+ .getHeartBeatInterval().toMillis(), TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
+
+ appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+ assertFalse(appendEntries.getLeaderAddress().isPresent());
+ }
+