Bug 5740: Remove Serializable where not necessary
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / SnapshotManager.java
index 79f2ce9b4ca0fb2b0deca496eed67e069bc1a547..71ef8ffcea04360b6db573de04d5344d44b3cf7c 100644 (file)
@@ -10,13 +10,21 @@ package org.opendaylight.controller.cluster.raft;
 
 import akka.persistence.SnapshotSelectionCriteria;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.io.ByteSource;
+import java.io.IOException;
+import java.io.OutputStream;
 import java.util.List;
+import java.util.Optional;
 import java.util.function.Consumer;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.slf4j.Logger;
 
 /**
@@ -48,10 +56,10 @@ public class SnapshotManager implements SnapshotState {
     private CaptureSnapshot captureSnapshot;
     private long lastSequenceNumber = -1;
 
-    private Runnable createSnapshotProcedure;
+    private Consumer<Optional<OutputStream>> createSnapshotProcedure;
 
     private ApplySnapshot applySnapshot;
-    private Consumer<byte[]> applySnapshotProcedure;
+    private RaftActorSnapshotCohort snapshotCohort = NoopRaftActorSnapshotCohort.INSTANCE;
 
     /**
      * Constructs an instance.
@@ -89,8 +97,9 @@ public class SnapshotManager implements SnapshotState {
     }
 
     @Override
-    public void persist(final byte[] snapshotBytes, final long totalMemory) {
-        currentState.persist(snapshotBytes, totalMemory);
+    public void persist(final Snapshot.State state, final Optional<OutputStream> installSnapshotStream,
+            final long totalMemory) {
+        currentState.persist(state, installSnapshotStream, totalMemory);
     }
 
     @Override
@@ -108,12 +117,17 @@ public class SnapshotManager implements SnapshotState {
         return currentState.trimLog(desiredTrimIndex);
     }
 
-    public void setCreateSnapshotRunnable(Runnable createSnapshotProcedure) {
+    void setCreateSnapshotConsumer(Consumer<Optional<OutputStream>> createSnapshotProcedure) {
         this.createSnapshotProcedure = createSnapshotProcedure;
     }
 
-    public void setApplySnapshotConsumer(Consumer<byte[]> applySnapshotProcedure) {
-        this.applySnapshotProcedure = applySnapshotProcedure;
+    void setSnapshotCohort(final RaftActorSnapshotCohort snapshotCohort) {
+        this.snapshotCohort = snapshotCohort;
+    }
+
+    @Nonnull
+    public Snapshot.State convertSnapshot(ByteSource snapshotBytes) throws IOException {
+        return snapshotCohort.deserializeSnapshot(snapshotBytes);
     }
 
     public long getLastSequenceNumber() {
@@ -138,11 +152,9 @@ public class SnapshotManager implements SnapshotState {
      *
      * @param lastLogEntry the last log entry for the snapshot.
      * @param replicatedToAllIndex the index of the last entry replicated to all followers.
-     * @param installSnapshotInitiated true if snapshot is initiated to install on a follower.
      * @return a new CaptureSnapshot instance.
      */
-    public CaptureSnapshot newCaptureSnapshot(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex,
-            boolean installSnapshotInitiated) {
+    public CaptureSnapshot newCaptureSnapshot(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
         TermInformationReader lastAppliedTermInfoReader =
                 lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(),
                         lastLogEntry, hasFollowers());
@@ -169,7 +181,7 @@ public class SnapshotManager implements SnapshotState {
         }
 
         return new CaptureSnapshot(lastLogEntryIndex, lastLogEntryTerm, lastAppliedIndex, lastAppliedTerm,
-                newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, installSnapshotInitiated);
+                newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries);
     }
 
     private class AbstractSnapshotState implements SnapshotState {
@@ -198,7 +210,8 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public void persist(final byte[] snapshotBytes, final long totalMemory) {
+        public void persist(final Snapshot.State state, final Optional<OutputStream> installSnapshotStream,
+                final long totalMemory) {
             log.debug("persist should not be called in state {}", this);
         }
 
@@ -261,9 +274,11 @@ public class SnapshotManager implements SnapshotState {
 
         @SuppressWarnings("checkstyle:IllegalCatch")
         private boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
-            captureSnapshot = newCaptureSnapshot(lastLogEntry, replicatedToAllIndex, targetFollower != null);
+            captureSnapshot = newCaptureSnapshot(lastLogEntry, replicatedToAllIndex);
 
-            if (captureSnapshot.isInstallSnapshotInitiated()) {
+            OutputStream installSnapshotStream = null;
+            if (targetFollower != null) {
+                installSnapshotStream = context.newFileBackedOutputStream();
                 log.info("{}: Initiating snapshot capture {} to install on {}",
                         persistenceId(), captureSnapshot, targetFollower);
             } else {
@@ -277,7 +292,7 @@ public class SnapshotManager implements SnapshotState {
             SnapshotManager.this.currentState = CREATING;
 
             try {
-                createSnapshotProcedure.run();
+                createSnapshotProcedure.accept(Optional.ofNullable(installSnapshotStream));
             } catch (Exception e) {
                 SnapshotManager.this.currentState = IDLE;
                 log.error("Error creating snapshot", e);
@@ -325,11 +340,12 @@ public class SnapshotManager implements SnapshotState {
     private class Creating extends AbstractSnapshotState {
 
         @Override
-        public void persist(final byte[] snapshotBytes, final long totalMemory) {
+        public void persist(final Snapshot.State snapshotState, final Optional<OutputStream> installSnapshotStream,
+                final long totalMemory) {
             // create a snapshot object from the state provided and save it
             // when snapshot is saved async, SaveSnapshotSuccess is raised.
 
-            Snapshot snapshot = Snapshot.create(snapshotBytes,
+            Snapshot snapshot = Snapshot.create(snapshotState,
                     captureSnapshot.getUnAppliedEntries(),
                     captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
                     captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm(),
@@ -393,10 +409,19 @@ public class SnapshotManager implements SnapshotState {
                     context.getId(), context.getReplicatedLog().getSnapshotIndex(),
                     context.getReplicatedLog().getSnapshotTerm());
 
-            if (context.getId().equals(currentBehavior.getLeaderId())
-                    && captureSnapshot.isInstallSnapshotInitiated()) {
-                // this would be call straight to the leader and won't initiate in serialization
-                currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot(snapshot));
+            if (installSnapshotStream.isPresent()) {
+                if (context.getId().equals(currentBehavior.getLeaderId())) {
+                    try {
+                        ByteSource snapshotBytes = ((FileBackedOutputStream)installSnapshotStream.get()).asByteSource();
+                        currentBehavior.handleMessage(context.getActor(),
+                                new SendInstallSnapshot(snapshot, snapshotBytes));
+                    } catch (IOException e) {
+                        log.error("{}: Snapshot install failed due to an unrecoverable streaming error",
+                                context.getId(), e);
+                    }
+                } else {
+                    ((FileBackedOutputStream)installSnapshotStream.get()).cleanup();
+                }
             }
 
             captureSnapshot = null;
@@ -431,8 +456,8 @@ public class SnapshotManager implements SnapshotState {
                         context.updatePeerIds(snapshot.getServerConfiguration());
                     }
 
-                    if (snapshot.getState().length > 0 ) {
-                        applySnapshotProcedure.accept(snapshot.getState());
+                    if (!(snapshot.getState() instanceof EmptyState)) {
+                        snapshotCohort.applySnapshot(snapshot.getState());
                     }
 
                     applySnapshot.getCallback().onSuccess();