*/
public SnapshotManager(final RaftActorContext context, final Logger logger) {
this.context = context;
- this.log = logger;
+ log = logger;
}
public boolean isApplying() {
//use the term of the temp-min, since we check for isPresent, entry will not be null
ReplicatedLogEntry entry = context.getReplicatedLog().get(tempMin);
context.getReplicatedLog().snapshotPreCommit(tempMin, entry.getTerm());
- context.getReplicatedLog().snapshotCommit();
+ context.getReplicatedLog().snapshotCommit(false);
return tempMin;
}
log.debug("{}: lastSequenceNumber prior to capture: {}", persistenceId(), lastSequenceNumber);
- SnapshotManager.this.currentState = CREATING;
+ currentState = CREATING;
try {
createSnapshotProcedure.accept(Optional.ofNullable(installSnapshotStream));
} catch (Exception e) {
- SnapshotManager.this.currentState = IDLE;
+ currentState = IDLE;
log.error("Error creating snapshot", e);
return false;
}
@Override
public void apply(final ApplySnapshot toApply) {
- SnapshotManager.this.applySnapshot = toApply;
+ applySnapshot = toApply;
lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber();
context.getPersistenceProvider().saveSnapshot(toApply.getSnapshot());
- SnapshotManager.this.currentState = PERSISTING;
+ currentState = PERSISTING;
}
@Override
log.info("{}: Persisting of snapshot done: {}", persistenceId(), snapshot);
- long dataThreshold = totalMemory * context.getConfigParams().getSnapshotDataThresholdPercentage() / 100;
- boolean dataSizeThresholdExceeded = context.getReplicatedLog().dataSize() > dataThreshold;
+ final ConfigParams config = context.getConfigParams();
+ final long absoluteThreshold = config.getSnapshotDataThreshold();
+ final long dataThreshold = absoluteThreshold != 0 ? absoluteThreshold * ConfigParams.MEGABYTE
+ : totalMemory * config.getSnapshotDataThresholdPercentage() / 100;
- boolean logSizeExceededSnapshotBatchCount =
- context.getReplicatedLog().size() >= context.getConfigParams().getSnapshotBatchCount();
+ final boolean dataSizeThresholdExceeded = context.getReplicatedLog().dataSize() > dataThreshold;
+ final boolean logSizeExceededSnapshotBatchCount =
+ context.getReplicatedLog().size() >= config.getSnapshotBatchCount();
final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
if (dataSizeThresholdExceeded || logSizeExceededSnapshotBatchCount || captureSnapshot.isMandatoryTrim()) {
} else if (logSizeExceededSnapshotBatchCount) {
log.debug("{}: log size {} exceeds the snapshot batch count {} - doing snapshotPreCommit with "
+ "index {}", context.getId(), context.getReplicatedLog().size(),
- context.getConfigParams().getSnapshotBatchCount(),
- captureSnapshot.getLastAppliedIndex());
+ config.getSnapshotBatchCount(), captureSnapshot.getLastAppliedIndex());
} else {
log.debug("{}: user triggered or root overwrite snapshot encountered, trimming log up to "
+ "last applied index {}", context.getId(), captureSnapshot.getLastAppliedIndex());
if (installSnapshotStream.isPresent()) {
if (context.getId().equals(currentBehavior.getLeaderId())) {
try {
- ByteSource snapshotBytes = ((FileBackedOutputStream)installSnapshotStream.get()).asByteSource();
+ ByteSource snapshotBytes = ((FileBackedOutputStream)installSnapshotStream.orElseThrow())
+ .asByteSource();
currentBehavior.handleMessage(context.getActor(),
new SendInstallSnapshot(snapshot, snapshotBytes));
} catch (IOException e) {
context.getId(), e);
}
} else {
- ((FileBackedOutputStream)installSnapshotStream.get()).cleanup();
+ ((FileBackedOutputStream)installSnapshotStream.orElseThrow()).cleanup();
}
}
captureSnapshot = null;
- SnapshotManager.this.currentState = PERSISTING;
+ currentState = PERSISTING;
}
@Override
private void snapshotComplete() {
lastSequenceNumber = -1;
applySnapshot = null;
- SnapshotManager.this.currentState = IDLE;
+ currentState = IDLE;
context.getActor().tell(SnapshotComplete.INSTANCE, context.getActor());
}
LastAppliedTermInformationReader init(final ReplicatedLog log, final long originalIndex,
final ReplicatedLogEntry lastLogEntry, final boolean hasFollowers) {
ReplicatedLogEntry entry = log.get(originalIndex);
- this.index = -1L;
- this.term = -1L;
+ index = -1L;
+ term = -1L;
if (!hasFollowers) {
if (lastLogEntry != null) {
// since we have persisted the last-log-entry to persistent journal before the capture,
@Override
public long getIndex() {
- return this.index;
+ return index;
}
@Override
public long getTerm() {
- return this.term;
+ return term;
}
}
ReplicatedToAllTermInformationReader init(final ReplicatedLog log, final long originalIndex) {
ReplicatedLogEntry entry = log.get(originalIndex);
- this.index = -1L;
- this.term = -1L;
+ index = -1L;
+ term = -1L;
if (entry != null) {
index = entry.getIndex();
@Override
public long getIndex() {
- return this.index;
+ return index;
}
@Override
public long getTerm() {
- return this.term;
+ return term;
}
}
}