private final Procedure<Void> createSnapshotProcedure = new Procedure<Void>() {
@Override
- public void apply(Void notUsed) throws Exception {
+ public void apply(Void notUsed) {
cohort.createSnapshot(context.getActor());
}
};
+ private final Procedure<byte[]> applySnapshotProcedure = new Procedure<byte[]>() {
+ @Override
+ public void apply(byte[] state) {
+ cohort.applySnapshot(state);
+ }
+ };
+
RaftActorSnapshotMessageSupport(RaftActorContext context, RaftActorBehavior currentBehavior,
RaftActorSnapshotCohort cohort) {
this.context = context;
this.log = context.getLogger();
context.getSnapshotManager().setCreateSnapshotCallable(createSnapshotProcedure);
+ context.getSnapshotManager().setApplySnapshotProcedure(applySnapshotProcedure);
}
boolean handleSnapshotMessage(Object message) {
onCaptureSnapshotReply(((CaptureSnapshotReply) message).getSnapshot());
return true;
} else if (message.equals(COMMIT_SNAPSHOT)) {
- context.getSnapshotManager().commit(-1);
+ context.getSnapshotManager().commit(-1, currentBehavior);
return true;
} else {
return false;
long sequenceNumber = success.metadata().sequenceNr();
- context.getSnapshotManager().commit(sequenceNumber);
+ context.getSnapshotManager().commit(sequenceNumber, currentBehavior);
}
private void onApplySnapshot(Snapshot snapshot) {
- if(log.isDebugEnabled()) {
- log.debug("{}: ApplySnapshot called on Follower Actor " +
- "snapshotIndex:{}, snapshotTerm:{}", context.getId(), snapshot.getLastAppliedIndex(),
- snapshot.getLastAppliedTerm());
- }
-
- cohort.applySnapshot(snapshot.getState());
+ log.info("{}: Applying snapshot on follower with snapshotIndex: {}, snapshotTerm: {}", context.getId(),
+ snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm());
- //clears the followers log, sets the snapshot index to ensure adjusted-index works
- context.setReplicatedLog(ReplicatedLogImpl.newInstance(snapshot, context, currentBehavior));
- context.setLastApplied(snapshot.getLastAppliedIndex());
+ context.getSnapshotManager().apply(snapshot);
}
}