}
@Override
- public int getSnapshotChunkSize() {
+ public int getMaximumMessageSliceSize() {
return 50;
}
}
*
* @return the maximum size (in bytes).
*/
- int getSnapshotChunkSize();
+ int getMaximumMessageSliceSize();
/**
* Returns the maximum number of journal log entries to batch on recovery before applying.
*/
private static final int ELECTION_TIME_MAX_VARIANCE = 100;
- private static final int SNAPSHOT_CHUNK_SIZE = 480 * 1024; // 480KiB
+ private static final int MAXIMUM_MESSAGE_SLICE_SIZE = 480 * 1024; // 480KiB
/**
// 0 means direct threshold if disabled
private int snapshotDataThreshold = 0;
- private int snapshotChunkSize = SNAPSHOT_CHUNK_SIZE;
+ private int maximumMessageSliceSize = MAXIMUM_MESSAGE_SLICE_SIZE;
private long electionTimeoutFactor = 2;
private long candidateElectionTimeoutDivisor = 1;
this.snapshotDataThreshold = snapshotDataThreshold;
}
- public void setSnapshotChunkSize(final int snapshotChunkSize) {
- this.snapshotChunkSize = snapshotChunkSize;
+ public void setMaximumMessageSliceSize(final int maximumMessageSliceSize) {
+ this.maximumMessageSliceSize = maximumMessageSliceSize;
}
public void setJournalRecoveryLogBatchSize(final int journalRecoveryLogBatchSize) {
}
@Override
- public int getSnapshotChunkSize() {
- return snapshotChunkSize;
+ public int getMaximumMessageSliceSize() {
+ return maximumMessageSliceSize;
}
@Override
super(context, state);
appendEntriesMessageSlicer = MessageSlicer.builder().logContext(logName())
- .messageSliceSize(context.getConfigParams().getSnapshotChunkSize())
+ .messageSliceSize(context.getConfigParams().getMaximumMessageSliceSize())
.expireStateAfterInactivity(context.getConfigParams().getElectionTimeOutInterval().toMillis() * 3,
TimeUnit.MILLISECONDS).build();
// Try to get all the entries in the journal but not exceeding the max data size for a single AppendEntries
// message.
int maxEntries = (int) context.getReplicatedLog().size();
- final int maxDataSize = context.getConfigParams().getSnapshotChunkSize();
+ final int maxDataSize = context.getConfigParams().getMaximumMessageSliceSize();
final long followerNextIndex = followerLogInfo.getNextIndex();
List<ReplicatedLogEntry> entries = context.getReplicatedLog().getFrom(followerNextIndex,
maxEntries, maxDataSize);
getReplicatedToAllIndex(), followerId);
if (captureInitiated) {
followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState(
- context.getConfigParams().getSnapshotChunkSize(), logName()));
+ context.getConfigParams().getMaximumMessageSliceSize(), logName()));
}
return captureInitiated;
if (snapshotHolder.isPresent()) {
LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState();
if (installSnapshotState == null) {
- installSnapshotState = new LeaderInstallSnapshotState(context.getConfigParams().getSnapshotChunkSize(),
- logName());
+ installSnapshotState = new LeaderInstallSnapshotState(
+ context.getConfigParams().getMaximumMessageSliceSize(), logName());
followerLogInfo.setLeaderInstallSnapshotState(installSnapshotState);
}
}
// FIXME: this is an arbitrary limit. Document interactions and/or improve them to improve maintainability
- protected static final int SNAPSHOT_CHUNK_SIZE = 700;
+ protected static final int MAXIMUM_MESSAGE_SLICE_SIZE = 700;
protected final Logger testLog = LoggerFactory.getLogger(getClass());
protected long currentTerm;
protected int snapshotBatchCount = 4;
- protected int snapshotChunkSize = SNAPSHOT_CHUNK_SIZE;
+ protected int maximumMessageSliceSize = MAXIMUM_MESSAGE_SLICE_SIZE;
protected List<MockPayload> expSnapshotState = new ArrayList<>();
configParams.setSnapshotBatchCount(snapshotBatchCount);
configParams.setSnapshotDataThresholdPercentage(70);
configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
- configParams.setSnapshotChunkSize(snapshotChunkSize);
+ configParams.setMaximumMessageSliceSize(maximumMessageSliceSize);
return configParams;
}
assertEquals("Persisted Snapshot getUnAppliedEntries size", 0, unAppliedEntry.size());
int snapshotSize = SerializationUtils.serialize(persistedSnapshot.getState()).length;
- final int expTotalChunks = snapshotSize / SNAPSHOT_CHUNK_SIZE
- + (snapshotSize % SNAPSHOT_CHUNK_SIZE > 0 ? 1 : 0);
+ final int expTotalChunks = snapshotSize / MAXIMUM_MESSAGE_SLICE_SIZE
+ + (snapshotSize % MAXIMUM_MESSAGE_SLICE_SIZE > 0 ? 1 : 0);
InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(follower2CollectorActor,
InstallSnapshot.class);
// Create the leader and 2 follower actors.
- snapshotChunkSize = 20;
+ maximumMessageSliceSize = 20;
DefaultConfigParamsImpl followerConfigParams = newFollowerConfigParams();
followerConfigParams.setSnapshotBatchCount(snapshotBatchCount);
// Send a large payload that exceeds the size threshold and needs to be sliced.
- MockPayload largePayload = sendPayloadData(leaderActor, "large", snapshotChunkSize + 1);
+ MockPayload largePayload = sendPayloadData(leaderActor, "large", maximumMessageSliceSize + 1);
// Then send a small payload that does not need to be sliced.
- MockPayload smallPayload = sendPayloadData(leaderActor, "normal", snapshotChunkSize - 1);
+ MockPayload smallPayload = sendPayloadData(leaderActor, "normal", maximumMessageSliceSize - 1);
final List<ApplyState> leaderApplyState = expectMatching(leaderCollectorActor, ApplyState.class, 2);
verifyApplyState(leaderApplyState.get(0), leaderCollectorActor,
List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
-1, null, null), ByteSource.wrap(bs.toByteArray())));
LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
- actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
+ actorContext.getConfigParams().getMaximumMessageSliceSize(), leader.logName());
fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
List.of(), commitIndex, snapshotTerm, commitIndex, snapshotTerm,
-1, null, null), ByteSource.wrap(bs.toByteArray())));
LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
- actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
+ actorContext.getConfigParams().getMaximumMessageSliceSize(), leader.logName());
fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
while (!fts.isLastChunk(fts.getChunkIndex())) {
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl() {
@Override
- public int getSnapshotChunkSize() {
+ public int getMaximumMessageSliceSize() {
return 50;
}
};
actorContext.setConfigParams(new DefaultConfigParamsImpl() {
@Override
- public int getSnapshotChunkSize() {
+ public int getMaximumMessageSliceSize() {
return 50;
}
});
actorContext.setConfigParams(new DefaultConfigParamsImpl() {
@Override
- public int getSnapshotChunkSize() {
+ public int getMaximumMessageSliceSize() {
return 50;
}
});
((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
new FiniteDuration(1000, TimeUnit.SECONDS));
// Note: the size here depends on estimate
- ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(246);
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setMaximumMessageSliceSize(246);
leaderActorContext.setReplicatedLog(
new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 4, 1).build());
MockRaftActorContext leaderActorContext = createActorContextWithFollower();
((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
new FiniteDuration(300, TimeUnit.MILLISECONDS));
- ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(serializedSize - 50);
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setMaximumMessageSliceSize(serializedSize - 50);
leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
leaderActorContext.setCommitIndex(-1);
leaderActorContext.setLastApplied(-1);
((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
new FiniteDuration(100, TimeUnit.MILLISECONDS));
((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setElectionTimeoutFactor(1);
- ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setSnapshotChunkSize(10);
+ ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setMaximumMessageSliceSize(10);
leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
leaderActorContext.setCommitIndex(-1);
leaderActorContext.setLastApplied(-1);
MessageCollectorActor.clearMessages(followerActor);
sendReplicate(leaderActorContext, term, 0, new MockRaftActorContext.MockPayload("large",
- leaderActorContext.getConfigParams().getSnapshotChunkSize() + 1));
+ leaderActorContext.getConfigParams().getMaximumMessageSliceSize() + 1));
MessageCollectorActor.expectFirstMatching(followerActor, MessageSlice.class);
// Sleep for at least 3 * election timeout so the slicing state expires.
private static class MockConfigParamsImpl extends DefaultConfigParamsImpl {
private final long electionTimeOutIntervalMillis;
- private final int snapshotChunkSize;
+ private final int maximumMessageSliceSize;
- MockConfigParamsImpl(final long electionTimeOutIntervalMillis, final int snapshotChunkSize) {
+ MockConfigParamsImpl(final long electionTimeOutIntervalMillis, final int maximumMessageSliceSize) {
this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
- this.snapshotChunkSize = snapshotChunkSize;
+ this.maximumMessageSliceSize = maximumMessageSliceSize;
}
@Override
}
@Override
- public int getSnapshotChunkSize() {
- return snapshotChunkSize;
+ public int getMaximumMessageSliceSize() {
+ return maximumMessageSliceSize;
}
}
}
setCandidateElectionTimeoutDivisor(other.raftConfig.getCandidateElectionTimeoutDivisor());
setCustomRaftPolicyImplementation(other.raftConfig.getCustomRaftPolicyImplementationClass());
setMaximumMessageSliceSize(other.getMaximumMessageSliceSize());
- setShardSnapshotChunkSize(other.raftConfig.getSnapshotChunkSize());
setPeerAddressResolver(other.raftConfig.getPeerAddressResolver());
setTempFileDirectory(other.getTempFileDirectory());
setFileBackedStreamingThreshold(other.getFileBackedStreamingThreshold());
raftConfig.setRecoverySnapshotIntervalSeconds(recoverySnapshotInterval);
}
- @Deprecated
- private void setShardSnapshotChunkSize(final int shardSnapshotChunkSize) {
- // We'll honor the shardSnapshotChunkSize setting for backwards compatibility but only if it doesn't exceed
- // maximumMessageSliceSize.
- if (shardSnapshotChunkSize < maximumMessageSliceSize) {
- raftConfig.setSnapshotChunkSize(shardSnapshotChunkSize);
- }
- }
-
private void setMaximumMessageSliceSize(final int maximumMessageSliceSize) {
- raftConfig.setSnapshotChunkSize(maximumMessageSliceSize);
+ raftConfig.setMaximumMessageSliceSize(maximumMessageSliceSize);
this.maximumMessageSliceSize = maximumMessageSliceSize;
}
return this;
}
- @Deprecated
- public Builder shardSnapshotChunkSize(final int shardSnapshotChunkSize) {
- LOG.warn("The shard-snapshot-chunk-size configuration parameter is deprecated - "
- + "use maximum-message-slice-size instead");
- datastoreContext.setShardSnapshotChunkSize(shardSnapshotChunkSize);
- return this;
- }
-
public Builder maximumMessageSliceSize(final int maximumMessageSliceSize) {
datastoreContext.setMaximumMessageSliceSize(maximumMessageSliceSize);
return this;
the distributed datastore provider implementation";
revision "2023-12-29" {
- description "Remote use-tell-based-protocol leaf";
+ description "Remote use-tell-based-protocol and shard-snapshot-chunk-size leaves";
}
revision "2014-06-12" {
cannot be found then the default raft behavior will be applied";
}
- leaf shard-snapshot-chunk-size {
- status deprecated;
- default 491520;
- type non-zero-uint32-type;
- description "When sending a snapshot to a follower, this is the maximum size in bytes for
- a chunk of data.";
- }
-
leaf maximum-message-slice-size {
default 491520;
type non-zero-uint32-type;