Move SnapshotBytes propagation 94/115994/2
authorRobert Varga <robert.varga@pantheon.tech>
Sat, 22 Mar 2025 12:57:41 +0000 (13:57 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Sat, 22 Mar 2025 13:04:50 +0000 (14:04 +0100)
We no longer need a Snapshot, hence we can move the propagation logic to
the single method which is invoking it.

JIRA: CONTROLLER-2134
Change-Id: I87544711d1a69825fa920f23c2893d58f757c753
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java

index 636875c6cfd932cec239398504646881169c182e..37c01b3e8acab871863d062e9cbc5a499558ead6 100644 (file)
@@ -248,7 +248,7 @@ public final class SnapshotManager {
         final var lastSeq = context.getPersistenceProvider().getLastSequenceNumber();
         final var snapshotState = snapshotCohort.takeSnapshot();
         LOG.debug("{}: captured snapshot at lastSequenceNumber: {}", memberId(), lastSeq);
-        persist(lastSeq, request, snapshotState, null);
+        persist(lastSeq, request, snapshotState);
         return true;
     }
 
@@ -317,15 +317,39 @@ public final class SnapshotManager {
      */
     @VisibleForTesting
     public void persist(final Snapshot.State snapshotState, final @Nullable OutputStream installSnapshotStream) {
-        if (task instanceof Capture(var lastSeq, var request)) {
-            persist(lastSeq, request, snapshotState, installSnapshotStream);
-        } else {
+        if (!(task instanceof Capture(var lastSeq, var request))) {
             LOG.debug("{}: persist should not be called in state {}", memberId(), task);
+            return;
+        }
+
+        persist(lastSeq, request, snapshotState);
+
+        if (installSnapshotStream != null) {
+            // FIXME: this should not be necessary
+            if (!(installSnapshotStream instanceof FileBackedOutputStream snapshotStream)) {
+                throw new VerifyException("Unexpected stream " + installSnapshotStream);
+            }
+
+            final var currentBehavior = context.getCurrentBehavior();
+            if (!memberId().equals(currentBehavior.getLeaderId())) {
+                snapshotStream.cleanup();
+                return;
+            }
+
+            final ByteSource bytes;
+            try {
+                bytes = snapshotStream.asByteSource();
+            } catch (IOException e) {
+                LOG.error("{}: Snapshot install failed due to an unrecoverable streaming error", memberId(), e);
+                return;
+            }
+
+            currentBehavior.handleMessage(context.getActor(),
+                new SnapshotBytes(request.getLastAppliedIndex(), request.getLastAppliedTerm(), bytes));
         }
     }
 
-    private void persist(final long lastSeq, final CaptureSnapshot request, final Snapshot.State snapshotState,
-            final @Nullable OutputStream installSnapshotStream) {
+    private void persist(final long lastSeq, final CaptureSnapshot request, final Snapshot.State snapshotState) {
         // create a snapshot object from the state provided and save it when snapshot is saved async,
         // SaveSnapshotSuccess is raised.
         final var snapshot = Snapshot.create(snapshotState, request.getUnAppliedEntries(),
@@ -392,25 +416,6 @@ public final class SnapshotManager {
         LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex: {} and term: {}", memberId(),
             replLog.getSnapshotIndex(), replLog.getSnapshotTerm());
 
-        if (installSnapshotStream != null) {
-            // FIXME: this should not be necessary
-            if (!(installSnapshotStream instanceof FileBackedOutputStream snapshotStream)) {
-                throw new VerifyException("Unexpected stream " + installSnapshotStream);
-            }
-
-            if (memberId().equals(currentBehavior.getLeaderId())) {
-                try {
-                    final var bytes = snapshotStream.asByteSource();
-                    currentBehavior.handleMessage(context.getActor(),
-                        new SnapshotBytes(request.getLastAppliedIndex(), request.getLastAppliedTerm(), bytes));
-                } catch (IOException e) {
-                    LOG.error("{}: Snapshot install failed due to an unrecoverable streaming error", memberId(), e);
-                }
-            } else {
-                snapshotStream.cleanup();
-            }
-        }
-
         task = new PersistCapture(lastSeq, request);
     }