final var lastSeq = context.getPersistenceProvider().getLastSequenceNumber();
final var snapshotState = snapshotCohort.takeSnapshot();
LOG.debug("{}: captured snapshot at lastSequenceNumber: {}", memberId(), lastSeq);
- persist(lastSeq, request, snapshotState, null);
+ persist(lastSeq, request, snapshotState);
return true;
}
*/
@VisibleForTesting
public void persist(final Snapshot.State snapshotState, final @Nullable OutputStream installSnapshotStream) {
- if (task instanceof Capture(var lastSeq, var request)) {
- persist(lastSeq, request, snapshotState, installSnapshotStream);
- } else {
+ if (!(task instanceof Capture(var lastSeq, var request))) {
LOG.debug("{}: persist should not be called in state {}", memberId(), task);
+ return;
+ }
+
+ persist(lastSeq, request, snapshotState);
+
+ if (installSnapshotStream != null) {
+ // FIXME: this should not be necessary
+ if (!(installSnapshotStream instanceof FileBackedOutputStream snapshotStream)) {
+ throw new VerifyException("Unexpected stream " + installSnapshotStream);
+ }
+
+ final var currentBehavior = context.getCurrentBehavior();
+ if (!memberId().equals(currentBehavior.getLeaderId())) {
+ snapshotStream.cleanup();
+ return;
+ }
+
+ final ByteSource bytes;
+ try {
+ bytes = snapshotStream.asByteSource();
+ } catch (IOException e) {
+ LOG.error("{}: Snapshot install failed due to an unrecoverable streaming error", memberId(), e);
+ return;
+ }
+
+ currentBehavior.handleMessage(context.getActor(),
+ new SnapshotBytes(request.getLastAppliedIndex(), request.getLastAppliedTerm(), bytes));
}
}
- private void persist(final long lastSeq, final CaptureSnapshot request, final Snapshot.State snapshotState,
- final @Nullable OutputStream installSnapshotStream) {
+ private void persist(final long lastSeq, final CaptureSnapshot request, final Snapshot.State snapshotState) {
// create a snapshot object from the state provided and save it when snapshot is saved async,
// SaveSnapshotSuccess is raised.
final var snapshot = Snapshot.create(snapshotState, request.getUnAppliedEntries(),
LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex: {} and term: {}", memberId(),
replLog.getSnapshotIndex(), replLog.getSnapshotTerm());
- if (installSnapshotStream != null) {
- // FIXME: this should not be necessary
- if (!(installSnapshotStream instanceof FileBackedOutputStream snapshotStream)) {
- throw new VerifyException("Unexpected stream " + installSnapshotStream);
- }
-
- if (memberId().equals(currentBehavior.getLeaderId())) {
- try {
- final var bytes = snapshotStream.asByteSource();
- currentBehavior.handleMessage(context.getActor(),
- new SnapshotBytes(request.getLastAppliedIndex(), request.getLastAppliedTerm(), bytes));
- } catch (IOException e) {
- LOG.error("{}: Snapshot install failed due to an unrecoverable streaming error", memberId(), e);
- }
- } else {
- snapshotStream.cleanup();
- }
- }
-
task = new PersistCapture(lastSeq, request);
}