/**
* This method is called during recovery to reconstruct the state of the actor.
*
- * @param snapshot A snapshot of the state of the actor
+ * @param snapshotBytes A snapshot of the state of the actor
*/
protected abstract void applyRecoverySnapshot(byte[] snapshotBytes);
LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
- //be greedy and remove entries from in-mem journal which are in the snapshot
- // and update snapshotIndex and snapshotTerm without waiting for the success,
+ long dataThreshold = Runtime.getRuntime().totalMemory() *
+ getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
+ if (context.getReplicatedLog().dataSize() > dataThreshold) {
+ // if memory is less, clear the log based on lastApplied.
+ // this could/should only happen if one of the followers is down
+ // as normally we keep removing from the log when its replicated to all.
+ context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
+ captureSnapshot.getLastAppliedTerm());
- context.getReplicatedLog().snapshotPreCommit(
- captureSnapshot.getLastAppliedIndex(),
- captureSnapshot.getLastAppliedTerm());
+ } else {
+ // clear the log based on replicatedToAllIndex
+ context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
+ captureSnapshot.getReplicatedToAllTerm());
+ }
+ getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
"and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(),
// FIXME: Maybe this should be done after the command is saved
journal.subList(adjustedIndex , journal.size()).clear();
- persistence().persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>(){
+ persistence().persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>() {
- @Override public void apply(DeleteEntries param)
- throws Exception {
+ @Override
+ public void apply(DeleteEntries param)
+ throws Exception {
//FIXME : Doing nothing for now
dataSize = 0;
- for(ReplicatedLogEntry entry : journal){
+ for (ReplicatedLogEntry entry : journal) {
dataSize += entry.size();
}
}
appendAndPersist(replicatedLogEntry, null);
}
- @Override
- public int dataSize() {
- return dataSize;
- }
-
public void appendAndPersist(
final ReplicatedLogEntry replicatedLogEntry,
final Procedure<ReplicatedLogEntry> callback) {
long dataSizeForCheck = dataSize;
dataSizeSinceLastSnapshot += logEntrySize;
- long journalSize = lastIndex()+1;
+ long journalSize = lastIndex() + 1;
if(!hasFollowers()) {
// When we do not have followers we do not maintain an in-memory log
}
// send a CaptureSnapshot to self to make the expensive operation async.
- getSelf().tell(new CaptureSnapshot(
- lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm),
+ long replicatedToAllIndex = getCurrentBehavior().getReplicatedToAllIndex();
+ ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
+ getSelf().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
+ (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
+ (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1)),
null);
context.setSnapshotCaptureInitiated(true);
}
- if(callback != null){
+ if (callback != null){
callback.apply(replicatedLogEntry);
}
}