private CaptureSnapshot captureSnapshot;
private long lastSequenceNumber = -1;
- private Consumer<Optional<OutputStream>> createSnapshotProcedure;
+ private Consumer<Optional<OutputStream>> createSnapshotProcedure = null;
private ApplySnapshot applySnapshot;
private RaftActorSnapshotCohort snapshotCohort = NoopRaftActorSnapshotCohort.INSTANCE;
return currentState.capture(lastLogEntry, replicatedToAllIndex);
}
+ @Override
+ public boolean captureWithForcedTrim(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex) {
+ return currentState.captureWithForcedTrim(lastLogEntry, replicatedToAllIndex);
+ }
+
@Override
public void apply(final ApplySnapshot snapshot) {
currentState.apply(snapshot);
* @param replicatedToAllIndex the index of the last entry replicated to all followers.
* @return a new CaptureSnapshot instance.
*/
- public CaptureSnapshot newCaptureSnapshot(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex) {
+ public CaptureSnapshot newCaptureSnapshot(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex,
+ final boolean mandatoryTrim) {
TermInformationReader lastAppliedTermInfoReader =
lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(),
lastLogEntry, hasFollowers());
List<ReplicatedLogEntry> unAppliedEntries = context.getReplicatedLog().getFrom(lastAppliedIndex + 1);
- long lastLogEntryIndex = lastAppliedIndex;
- long lastLogEntryTerm = lastAppliedTerm;
- if (lastLogEntry != null) {
+ final long lastLogEntryIndex;
+ final long lastLogEntryTerm;
+ if (lastLogEntry == null) {
+ // When we don't have journal present, for example two captureSnapshots executed right after another with no
+ // new journal we still want to preserve the index and term in the snapshot.
+ lastAppliedIndex = lastLogEntryIndex = context.getReplicatedLog().getSnapshotIndex();
+ lastAppliedTerm = lastLogEntryTerm = context.getReplicatedLog().getSnapshotTerm();
+
+ log.debug("{}: Capturing Snapshot : lastLogEntry is null. Using snapshot values lastAppliedIndex {} and "
+ + "lastAppliedTerm {} instead.", persistenceId(), lastAppliedIndex, lastAppliedTerm);
+ } else {
lastLogEntryIndex = lastLogEntry.getIndex();
lastLogEntryTerm = lastLogEntry.getTerm();
- } else {
- log.debug("{}: Capturing Snapshot : lastLogEntry is null. Using lastAppliedIndex {} and "
- + "lastAppliedTerm {} instead.", persistenceId(), lastAppliedIndex, lastAppliedTerm);
}
return new CaptureSnapshot(lastLogEntryIndex, lastLogEntryTerm, lastAppliedIndex, lastAppliedTerm,
- newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries);
+ newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, mandatoryTrim);
}
private class AbstractSnapshotState implements SnapshotState {
return false;
}
+ @Override
+ public boolean captureWithForcedTrim(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex) {
+ log.debug("captureWithForcedTrim should not be called in state {}", this);
+ return false;
+ }
+
@Override
public void apply(final ApplySnapshot snapshot) {
log.debug("apply should not be called in state {}", this);
@SuppressWarnings("checkstyle:IllegalCatch")
private boolean capture(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex,
- final String targetFollower) {
- captureSnapshot = newCaptureSnapshot(lastLogEntry, replicatedToAllIndex);
+ final String targetFollower, final boolean mandatoryTrim) {
+ captureSnapshot = newCaptureSnapshot(lastLogEntry, replicatedToAllIndex, mandatoryTrim);
OutputStream installSnapshotStream = null;
if (targetFollower != null) {
@Override
public boolean capture(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex) {
- return capture(lastLogEntry, replicatedToAllIndex, null);
+ return capture(lastLogEntry, replicatedToAllIndex, null, false);
}
@Override
public boolean captureToInstall(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex,
final String targetFollower) {
- return capture(lastLogEntry, replicatedToAllIndex, targetFollower);
+ return capture(lastLogEntry, replicatedToAllIndex, targetFollower, false);
+ }
+
+ @Override
+ public boolean captureWithForcedTrim(final ReplicatedLogEntry lastLogEntry, final long replicatedToAllIndex) {
+ return capture(lastLogEntry, replicatedToAllIndex, null, true);
}
@Override
context.getReplicatedLog().size() >= context.getConfigParams().getSnapshotBatchCount();
final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
- if (dataSizeThresholdExceeded || logSizeExceededSnapshotBatchCount) {
+ if (dataSizeThresholdExceeded || logSizeExceededSnapshotBatchCount || captureSnapshot.isMandatoryTrim()) {
if (log.isDebugEnabled()) {
if (dataSizeThresholdExceeded) {
log.debug("{}: log data size {} exceeds the memory threshold {} - doing snapshotPreCommit "
+ "with index {}", context.getId(), context.getReplicatedLog().dataSize(),
dataThreshold, captureSnapshot.getLastAppliedIndex());
- } else {
+ } else if (logSizeExceededSnapshotBatchCount) {
log.debug("{}: log size {} exceeds the snapshot batch count {} - doing snapshotPreCommit with "
+ "index {}", context.getId(), context.getReplicatedLog().size(),
context.getConfigParams().getSnapshotBatchCount(),
captureSnapshot.getLastAppliedIndex());
+ } else {
+ log.debug("{}: user triggered or root overwrite snapshot encountered, trimming log up to "
+ + "last applied index {}", context.getId(), captureSnapshot.getLastAppliedIndex());
}
}
context.getReplicatedLog().snapshotCommit();
}
- context.getPersistenceProvider().deleteSnapshots(new SnapshotSelectionCriteria(sequenceNumber,
+ context.getPersistenceProvider().deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(),
timeStamp - 1, 0L, 0L));
context.getPersistenceProvider().deleteMessages(lastSequenceNumber);