- LOG.info("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
-
- // create a snapshot object from the state provided and save it
- // when snapshot is saved async, SaveSnapshotSuccess is raised.
-
- Snapshot sn = Snapshot.create(snapshotBytes,
- context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
- captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
- captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
-
- persistence().saveSnapshot(sn);
-
- LOG.info("{}: Persisting of snapshot done:{}", persistenceId(), sn.getLogMessage());
-
- long dataThreshold = Runtime.getRuntime().totalMemory() *
- getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
- if (context.getReplicatedLog().dataSize() > dataThreshold) {
- // if memory is less, clear the log based on lastApplied.
- // this could/should only happen if one of the followers is down
- // as normally we keep removing from the log when its replicated to all.
- context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
- captureSnapshot.getLastAppliedTerm());
-
- getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
- } else if(captureSnapshot.getReplicatedToAllIndex() != -1){
- // clear the log based on replicatedToAllIndex
- context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
- captureSnapshot.getReplicatedToAllTerm());
-
- getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
- } else {
- // The replicatedToAllIndex was not found in the log
- // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot.
- // In this scenario we may need to save the snapshot to the akka persistence
- // snapshot for recovery but we do not need to do the replicated log trimming.
- context.getReplicatedLog().snapshotPreCommit(replicatedLog.getSnapshotIndex(),
- replicatedLog.getSnapshotTerm());
- }
-
-
- LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
- "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(),
- captureSnapshot.getLastAppliedTerm());