- LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
- "and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(),
- captureSnapshot.getLastAppliedTerm());
+ // Don't reset replicatedToAllIndex to -1 as this may prevent us from trimming the log after an
+ // install snapshot to a follower.
+ if(captureSnapshot.getReplicatedToAllIndex() >= 0) {
+ getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+ }
+ } else if(captureSnapshot.getReplicatedToAllIndex() != -1){
+ // clear the log based on replicatedToAllIndex
+ context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
+ captureSnapshot.getReplicatedToAllTerm());
+
+ getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+ } else {
+ // The replicatedToAllIndex was not found in the log
+ // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot.
+ // In this scenario we may need to save the snapshot to the akka persistence
+ // snapshot for recovery but we do not need to do the replicated log trimming.
+ context.getReplicatedLog().snapshotPreCommit(replicatedLog.getSnapshotIndex(),
+ replicatedLog.getSnapshotTerm());
+ }
+
+
+ LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex: {} " +
+ "and term: {}", persistenceId(), replicatedLog.getSnapshotIndex(),
+ replicatedLog.getSnapshotTerm());