import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.OptionalInt;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.SerializationUtils;
private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long term, final long index,
final Payload payload) {
- SimpleReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(index, term, payload);
- actorContext.getReplicatedLog().append(newEntry);
- return leader.handleMessage(leaderActor, new Replicate(null, null, newEntry, true));
+ actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(index, term, payload));
+ return leader.handleMessage(leaderActor, new Replicate(index, true, null, null));
}
@Test
actorContext.setLastApplied(0);
- long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
- long term = actorContext.getTermInformation().getCurrentTerm();
- ReplicatedLogEntry newEntry = new SimpleReplicatedLogEntry(
- newLogIndex, term, new MockRaftActorContext.MockPayload("foo"));
+ final long newLogIndex = actorContext.getReplicatedLog().lastIndex() + 1;
+ final long term = actorContext.getTermInformation().getCurrentTerm();
+ final var data = new MockRaftActorContext.MockPayload("foo");
- actorContext.getReplicatedLog().append(newEntry);
+ actorContext.getReplicatedLog().append(new SimpleReplicatedLogEntry(newLogIndex, term, data));
final Identifier id = new MockIdentifier("state-id");
- RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor,
- new Replicate(leaderActor, id, newEntry, true));
+ final var raftBehavior = leader.handleMessage(leaderActor, new Replicate(newLogIndex, true, leaderActor, id));
// State should not change
assertTrue(raftBehavior instanceof Leader);
// We should get 2 ApplyState messages - 1 for new log entry and 1 for the previous
// one since lastApplied state is 0.
- List<ApplyState> applyStateList = MessageCollectorActor.getAllMatching(
- leaderActor, ApplyState.class);
+ final var applyStateList = MessageCollectorActor.getAllMatching(leaderActor, ApplyState.class);
assertEquals("ApplyState count", newLogIndex, applyStateList.size());
for (int i = 0; i <= newLogIndex - 1; i++) {
}
ApplyState last = applyStateList.get((int) newLogIndex - 1);
- assertEquals("getData", newEntry.getData(), last.getReplicatedLogEntry().getData());
+ assertEquals("getData", data, last.getReplicatedLogEntry().getData());
assertEquals("getIdentifier", id, last.getIdentifier());
}
List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
-1, null, null), ByteSource.wrap(bs.toByteArray())));
LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
- actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
+ actorContext.getConfigParams().getMaximumMessageSliceSize(), leader.logName());
fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
// new entry
- SimpleReplicatedLogEntry entry =
- new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
- new MockRaftActorContext.MockPayload("D"));
-
- actorContext.getReplicatedLog().append(entry);
+ actorContext.getReplicatedLog().append(
+ new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D")));
//update follower timestamp
leader.markFollowerActive(FOLLOWER_ID);
// this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
RaftActorBehavior raftBehavior = leader.handleMessage(
- leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
+ leaderActor, new Replicate(newEntryIndex, true, null, new MockIdentifier("state-id")));
assertTrue(raftBehavior instanceof Leader);
leader.setSnapshotHolder(null);
// new entry
- SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
- new MockRaftActorContext.MockPayload("D"));
-
- actorContext.getReplicatedLog().append(entry);
+ actorContext.getReplicatedLog().append(
+ new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D")));
//update follower timestamp
leader.markFollowerActive(FOLLOWER_ID);
- leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
+ leader.handleMessage(leaderActor, new Replicate(newEntryIndex, true, null, new MockIdentifier("state-id")));
assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
assertEquals(2, cs.getLastTerm());
// if an initiate is started again when first is in progress, it shouldnt initiate Capture
- leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
+ leader.handleMessage(leaderActor, new Replicate(newEntryIndex, true, null, new MockIdentifier("state-id")));
assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
}
}
// new entry
- SimpleReplicatedLogEntry entry = new SimpleReplicatedLogEntry(newEntryIndex, currentTerm,
- new MockRaftActorContext.MockPayload("D"));
-
- actorContext.getReplicatedLog().append(entry);
+ actorContext.getReplicatedLog().append(
+ new SimpleReplicatedLogEntry(newEntryIndex, currentTerm, new MockRaftActorContext.MockPayload("D")));
//update follower timestamp
leader.markFollowerActive(FOLLOWER_ID);
MessageCollectorActor.clearMessages(followerActor);
// Sending Replicate message should not initiate another capture since the first is in progress.
- leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
+ leader.handleMessage(leaderActor, new Replicate(newEntryIndex, true, null, new MockIdentifier("state-id")));
assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
// Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture.
// Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent.
final byte[] bytes = new byte[]{1, 2, 3};
- installSnapshotStream.get().get().write(bytes);
+ installSnapshotStream.get().orElseThrow().write(bytes);
actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(),
Runtime.getRuntime().totalMemory());
MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
-1, null, null), ByteSource.wrap(bs.toByteArray())));
LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
- actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
+ actorContext.getConfigParams().getMaximumMessageSliceSize(), leader.logName());
fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
while (!fts.isLastChunk(fts.getChunkIndex())) {
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() {
@Override
- public int getSnapshotChunkSize() {
+ public int getMaximumMessageSliceSize() {
return 50;
}
};
actorContext.setConfigParams(new DefaultConfigParamsImpl() {
@Override
- public int getSnapshotChunkSize() {
+ public int getMaximumMessageSliceSize() {
return 50;
}
});
actorContext.setConfigParams(new DefaultConfigParamsImpl() {
@Override
- public int getSnapshotChunkSize() {
+ public int getMaximumMessageSliceSize() {
return 50;
}
});
assertEquals(1, installSnapshot.getChunkIndex());
assertEquals(3, installSnapshot.getTotalChunks());
- assertEquals(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE,
- installSnapshot.getLastChunkHashCode().getAsInt());
+ assertEquals(OptionalInt.of(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE),
+ installSnapshot.getLastChunkHashCode());
final int hashCode = Arrays.hashCode(installSnapshot.getData());
assertEquals(2, installSnapshot.getChunkIndex());
assertEquals(3, installSnapshot.getTotalChunks());
- assertEquals(hashCode, installSnapshot.getLastChunkHashCode().getAsInt());
+ assertEquals(OptionalInt.of(hashCode), installSnapshot.getLastChunkHashCode());
}
@Test
FollowerLogInformation followerInfo = leader.getFollower(FOLLOWER_ID);
assertEquals(payloadVersion, leader.getLeaderPayloadVersion());
- assertEquals(RaftVersions.HELIUM_VERSION, followerInfo.getRaftVersion());
+ assertEquals(RaftVersions.FLUORINE_VERSION, followerInfo.getRaftVersion());
AppendEntriesReply reply = new AppendEntriesReply(FOLLOWER_ID, 1, true, 2, 1, payloadVersion);
((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
new FiniteDuration(1000, TimeUnit.SECONDS));
// Note: the size here depends on estimate
- ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(246);
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setMaximumMessageSliceSize(246);
leaderActorContext.setReplicatedLog(
new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
MockRaftActorContext leaderActorContext = createActorContextWithFollower();
((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
new FiniteDuration(300, TimeUnit.MILLISECONDS));
- ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(serializedSize - 50);
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setMaximumMessageSliceSize(serializedSize - 50);
leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
leaderActorContext.setCommitIndex(-1);
leaderActorContext.setLastApplied(-1);
((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
new FiniteDuration(100, TimeUnit.MILLISECONDS));
((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
- ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(10);
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setMaximumMessageSliceSize(10);
leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
leaderActorContext.setCommitIndex(-1);
leaderActorContext.setLastApplied(-1);
MessageCollectorActor.clearMessages(followerActor);
sendReplicate(leaderActorContext, term, 0, new MockRaftActorContext.MockPayload("large",
- leaderActorContext.getConfigParams().getSnapshotChunkSize() + 1));
+ leaderActorContext.getConfigParams().getMaximumMessageSliceSize() + 1));
MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
// Sleep for at least 3 * election timeout so the slicing state expires.
// Initial heartbeat shouldn't have the leader address
AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- assertFalse(appendEntries.getLeaderAddress().isPresent());
+ assertNull(appendEntries.leaderAddress());
MessageCollectorActor.clearMessages(followerActor);
// Send AppendEntriesReply indicating the follower needs the leader address
leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- assertTrue(appendEntries.getLeaderAddress().isPresent());
- assertEquals(leaderActor.path().toString(), appendEntries.getLeaderAddress().get());
+ assertEquals(leaderActor.path().toString(), appendEntries.leaderAddress());
MessageCollectorActor.clearMessages(followerActor);
// Send AppendEntriesReply indicating the follower does not need the leader address
leader.handleMessage(leaderActor, SendHeartBeat.INSTANCE);
appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
- assertFalse(appendEntries.getLeaderAddress().isPresent());
+ assertNull(appendEntries.leaderAddress());
}
@Override
private static class MockConfigParamsImpl extends DefaultConfigParamsImpl {
private final long electionTimeOutIntervalMillis;
- private final int snapshotChunkSize;
+ private final int maximumMessageSliceSize;
- MockConfigParamsImpl(final long electionTimeOutIntervalMillis, final int snapshotChunkSize) {
+ MockConfigParamsImpl(final long electionTimeOutIntervalMillis, final int maximumMessageSliceSize) {
this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
- this.snapshotChunkSize = snapshotChunkSize;
+ this.maximumMessageSliceSize = maximumMessageSliceSize;
}
@Override
}
@Override
- public int getSnapshotChunkSize() {
- return snapshotChunkSize;
+ public int getMaximumMessageSliceSize() {
+ return maximumMessageSliceSize;
}
}
}