assertEquals("getPrevLogIndex", lastIndex - 1, appendEntries.getPrevLogIndex());
assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
assertEquals("Entries size", 1, appendEntries.getEntries().size());
- assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).getIndex());
- assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
+ assertEquals("Entry getIndex", lastIndex, appendEntries.getEntries().get(0).index());
+ assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).term());
assertEquals("getPayloadVersion", payloadVersion, appendEntries.getPayloadVersion());
}
assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
assertEquals("Entries size", 1, appendEntries.getEntries().size());
- assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
- assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
+ assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).index());
+ assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).term());
assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
assertEquals("Commit Index", lastIndex, actorContext.getCommitIndex());
}
assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
assertEquals("getPrevLogTerm", prevTerm, appendEntries.getPrevLogTerm());
assertEquals("Entries size", 1, appendEntries.getEntries().size());
- assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).getIndex());
- assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).getTerm());
+ assertEquals("Entry getIndex", newIndex, appendEntries.getEntries().get(0).index());
+ assertEquals("Entry getTerm", newTerm, appendEntries.getEntries().get(0).term());
assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
// The follower replies with success. The leader should now update the commit index to the new index
assertEquals("getPrevLogIndex", lastIndex, appendEntries.getPrevLogIndex());
assertEquals("getPrevLogTerm", term, appendEntries.getPrevLogTerm());
assertEquals("Entries size", 1, appendEntries.getEntries().size());
- assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).getIndex());
- assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).getTerm());
+ assertEquals("Entry getIndex", lastIndex + 1, appendEntries.getEntries().get(0).index());
+ assertEquals("Entry getTerm", term, appendEntries.getEntries().get(0).term());
assertEquals("Entry payload", "foo", appendEntries.getEntries().get(0).getData().toString());
assertEquals("Commit Index", lastIndex + 1, actorContext.getCommitIndex());
}
final List<ReplicatedLogEntry> entries = req.getEntries();
assertEquals(1, entries.size());
- assertEquals(messageNr + 2, entries.get(0).getIndex());
+ assertEquals(messageNr + 2, entries.get(0).index());
}
@Test
assertEquals("The number of append entries collected should be 2", 2, allMessages.size());
assertEquals(1, allMessages.get(0).getEntries().size());
- assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
+ assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).index());
assertEquals(1, allMessages.get(1).getEntries().size());
- assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).getIndex());
+ // FIXME: weird assert
+ assertEquals(lastIndex + 1, allMessages.get(0).getEntries().get(0).index());
}
for (int i = 0; i <= newLogIndex - 1; i++) {
ApplyState applyState = applyStateList.get(i);
- assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().getIndex());
- assertEquals("getTerm", term, applyState.getReplicatedLogEntry().getTerm());
+ assertEquals("getIndex", i + 1, applyState.getReplicatedLogEntry().index());
+ assertEquals("getTerm", term, applyState.getReplicatedLogEntry().term());
}
ApplyState last = applyStateList.get((int) newLogIndex - 1);
List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
-1, null, null), ByteSource.wrap(bs.toByteArray())));
LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
- actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
+ actorContext.getConfigParams().getMaximumMessageSliceSize(), leader.logName());
fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
-1, null, null), ByteSource.wrap(bs.toByteArray())));
LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
- actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
+ actorContext.getConfigParams().getMaximumMessageSliceSize(), leader.logName());
fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
while (!fts.isLastChunk(fts.getChunkIndex())) {
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() {
@Override
- public int getSnapshotChunkSize() {
+ public int getMaximumMessageSliceSize() {
return 50;
}
};
actorContext.setConfigParams(new DefaultConfigParamsImpl() {
@Override
- public int getSnapshotChunkSize() {
+ public int getMaximumMessageSliceSize() {
return 50;
}
});
actorContext.setConfigParams(new DefaultConfigParamsImpl() {
@Override
- public int getSnapshotChunkSize() {
+ public int getMaximumMessageSliceSize() {
return 50;
}
});
assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
assertEquals("Log entries size", 2, appendEntries.getEntries().size());
- assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("First entry index", 1, appendEntries.getEntries().get(0).index());
assertEquals("First entry data", leadersSecondLogEntry.getData(),
appendEntries.getEntries().get(0).getData());
- assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).getIndex());
+ assertEquals("Second entry index", 2, appendEntries.getEntries().get(1).index());
assertEquals("Second entry data", leadersThirdLogEntry.getData(),
appendEntries.getEntries().get(1).getData());
List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
ApplyState applyState = applyStateList.get(0);
- assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
- assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+ assertEquals("Follower's first ApplyState index", 1, applyState.getReplicatedLogEntry().index());
+ assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().term());
assertEquals("Follower's first ApplyState data", leadersSecondLogEntry.getData(),
applyState.getReplicatedLogEntry().getData());
applyState = applyStateList.get(1);
- assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().getIndex());
- assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+ assertEquals("Follower's second ApplyState index", 2, applyState.getReplicatedLogEntry().index());
+ assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().term());
assertEquals("Follower's second ApplyState data", leadersThirdLogEntry.getData(),
applyState.getReplicatedLogEntry().getData());
assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
assertEquals("Log entries size", 2, appendEntries.getEntries().size());
- assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("First entry index", 0, appendEntries.getEntries().get(0).index());
assertEquals("First entry data", leadersFirstLogEntry.getData(),
appendEntries.getEntries().get(0).getData());
- assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
+ assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).index());
assertEquals("Second entry data", leadersSecondLogEntry.getData(),
appendEntries.getEntries().get(1).getData());
List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
ApplyState applyState = applyStateList.get(0);
- assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
- assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+ assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().index());
+ assertEquals("Follower's first ApplyState term", 1, applyState.getReplicatedLogEntry().term());
assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
applyState.getReplicatedLogEntry().getData());
applyState = applyStateList.get(1);
- assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
- assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().getTerm());
+ assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().index());
+ assertEquals("Follower's second ApplyState term", 1, applyState.getReplicatedLogEntry().term());
assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
applyState.getReplicatedLogEntry().getData());
assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
assertEquals("Log entries size", 2, appendEntries.getEntries().size());
- assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
- assertEquals("First entry term", 2, appendEntries.getEntries().get(0).getTerm());
+ assertEquals("First entry index", 0, appendEntries.getEntries().get(0).index());
+ assertEquals("First entry term", 2, appendEntries.getEntries().get(0).term());
assertEquals("First entry data", leadersFirstLogEntry.getData(),
appendEntries.getEntries().get(0).getData());
- assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
- assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).getTerm());
+ assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).index());
+ assertEquals("Second entry term", 2, appendEntries.getEntries().get(1).term());
assertEquals("Second entry data", leadersSecondLogEntry.getData(),
appendEntries.getEntries().get(1).getData());
List<ApplyState> applyStateList = MessageCollectorActor.expectMatching(followerActor, ApplyState.class, 2);
ApplyState applyState = applyStateList.get(0);
- assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().getIndex());
- assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
+ assertEquals("Follower's first ApplyState index", 0, applyState.getReplicatedLogEntry().index());
+ assertEquals("Follower's first ApplyState term", 2, applyState.getReplicatedLogEntry().term());
assertEquals("Follower's first ApplyState data", leadersFirstLogEntry.getData(),
applyState.getReplicatedLogEntry().getData());
applyState = applyStateList.get(1);
- assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().getIndex());
- assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().getTerm());
+ assertEquals("Follower's second ApplyState index", 1, applyState.getReplicatedLogEntry().index());
+ assertEquals("Follower's second ApplyState term", 2, applyState.getReplicatedLogEntry().term());
assertEquals("Follower's second ApplyState data", leadersSecondLogEntry.getData(),
applyState.getReplicatedLogEntry().getData());
ApplyState applyState = applyStateList.get(0);
- assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
+ assertEquals(2, applyState.getReplicatedLogEntry().index());
assertEquals(2, followerInfo.getMatchIndex());
assertEquals(3, followerInfo.getNextIndex());
((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
new FiniteDuration(1000, TimeUnit.SECONDS));
// Note: the size here depends on estimate
- ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(246);
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setMaximumMessageSliceSize(246);
leaderActorContext.setReplicatedLog(
new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
assertEquals("Log entries size", 2, appendEntries.getEntries().size());
- assertEquals("First entry index", 0, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("First entry index", 0, appendEntries.getEntries().get(0).index());
assertEquals("First entry data", leadersFirstLogEntry.getData(),
appendEntries.getEntries().get(0).getData());
- assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).getIndex());
+ assertEquals("Second entry index", 1, appendEntries.getEntries().get(1).index());
assertEquals("Second entry data", leadersSecondLogEntry.getData(),
appendEntries.getEntries().get(1).getData());
assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
assertEquals("Log entries size", 2, appendEntries.getEntries().size());
- assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("First entry index", 2, appendEntries.getEntries().get(0).index());
assertEquals("First entry data", leadersThirdLogEntry.getData(),
appendEntries.getEntries().get(0).getData());
- assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).getIndex());
+ assertEquals("Second entry index", 3, appendEntries.getEntries().get(1).index());
assertEquals("Second entry data", leadersFourthLogEntry.getData(),
appendEntries.getEntries().get(1).getData());
AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(nonVotingFollowerActor,
AppendEntries.class);
assertEquals("Log entries size", 1, appendEntries.getEntries().size());
- assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("Log entry index", 1, appendEntries.getEntries().get(0).index());
// Send reply only from the non-voting follower and verify no consensus via no ApplyState.
leader.handleMessage(leaderActor, new AppendEntriesReply(nonVotingFollowerId, 1, true, 1, 1, (short)0));
MockRaftActorContext leaderActorContext = createActorContextWithFollower();
((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
new FiniteDuration(300, TimeUnit.MILLISECONDS));
- ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(serializedSize - 50);
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setMaximumMessageSliceSize(serializedSize - 50);
leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
leaderActorContext.setCommitIndex(-1);
leaderActorContext.setLastApplied(-1);
AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
assertEquals("Entries size", 1, appendEntries.getEntries().size());
- assertEquals("Entry getIndex", 0, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("Entry getIndex", 0, appendEntries.getEntries().get(0).index());
leader.handleMessage(followerActor, new AppendEntriesReply(FOLLOWER_ID, term, true, 0, term, (short)0));
assertEquals("getCommitIndex", 0, leaderActorContext.getCommitIndex());
appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
assertEquals("Entries size", 1, appendEntries.getEntries().size());
- assertEquals("Entry getIndex", 2, appendEntries.getEntries().get(0).getIndex());
+ assertEquals("Entry getIndex", 2, appendEntries.getEntries().get(0).index());
assertEquals("getLeaderCommit", 1, appendEntries.getLeaderCommit());
}
((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
new FiniteDuration(100, TimeUnit.MILLISECONDS));
((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
- ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(10);
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setMaximumMessageSliceSize(10);
leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
leaderActorContext.setCommitIndex(-1);
leaderActorContext.setLastApplied(-1);
MessageCollectorActor.clearMessages(followerActor);
sendReplicate(leaderActorContext, term, 0, new MockRaftActorContext.MockPayload("large",
- leaderActorContext.getConfigParams().getSnapshotChunkSize() + 1));
+ leaderActorContext.getConfigParams().getMaximumMessageSliceSize() + 1));
MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
// Sleep for at least 3 * election timeout so the slicing state expires.
// Initial heartbeat shouldn't have the leader address
AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- assertFalse(appendEntries.getLeaderAddress().isPresent());
+ assertNull(appendEntries.leaderAddress());
MessageCollectorActor.clearMessages(followerActor);
// Send AppendEntriesReply indicating the follower needs the leader address
leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- assertTrue(appendEntries.getLeaderAddress().isPresent());
- assertEquals(leaderActor.path().toString(), appendEntries.getLeaderAddress().orElseThrow());
+ assertEquals(leaderActor.path().toString(), appendEntries.leaderAddress());
MessageCollectorActor.clearMessages(followerActor);
// Send AppendEntriesReply indicating the follower does not need the leader address
leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- assertFalse(appendEntries.getLeaderAddress().isPresent());
+ assertNull(appendEntries.leaderAddress());
}
@Override
private static class MockConfigParamsImpl extends DefaultConfigParamsImpl {
private final long electionTimeOutIntervalMillis;
- private final int snapshotChunkSize;
+ private final int maximumMessageSliceSize;
- MockConfigParamsImpl(final long electionTimeOutIntervalMillis, final int snapshotChunkSize) {
+ MockConfigParamsImpl(final long electionTimeOutIntervalMillis, final int maximumMessageSliceSize) {
this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
- this.snapshotChunkSize = snapshotChunkSize;
+ this.maximumMessageSliceSize = maximumMessageSliceSize;
}
@Override
}
@Override
- public int getSnapshotChunkSize() {
- return snapshotChunkSize;
+ public int getMaximumMessageSliceSize() {
+ return maximumMessageSliceSize;
}
}
}