Bug 5740: Remove Serializable where not necessary
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / SnapshotManager.java
index 5db4706c623e3a8219554bcb40f08e584e2bda71..71ef8ffcea04360b6db573de04d5344d44b3cf7c 100644 (file)
@@ -10,13 +10,21 @@ package org.opendaylight.controller.cluster.raft;
 
 import akka.persistence.SnapshotSelectionCriteria;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.io.ByteSource;
+import java.io.IOException;
+import java.io.OutputStream;
 import java.util.List;
+import java.util.Optional;
 import java.util.function.Consumer;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.persisted.EmptyState;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.slf4j.Logger;
 
 /**
@@ -27,8 +35,13 @@ import org.slf4j.Logger;
  */
 public class SnapshotManager implements SnapshotState {
 
+    @SuppressWarnings("checkstyle:MemberName")
     private final SnapshotState IDLE = new Idle();
+
+    @SuppressWarnings({"checkstyle:MemberName", "checkstyle:AbbreviationAsWordInName"})
     private final SnapshotState PERSISTING = new Persisting();
+
+    @SuppressWarnings({"checkstyle:MemberName", "checkstyle:AbbreviationAsWordInName"})
     private final SnapshotState CREATING = new Creating();
 
     private final Logger log;
@@ -43,10 +56,10 @@ public class SnapshotManager implements SnapshotState {
     private CaptureSnapshot captureSnapshot;
     private long lastSequenceNumber = -1;
 
-    private Runnable createSnapshotProcedure;
+    private Consumer<Optional<OutputStream>> createSnapshotProcedure;
 
     private ApplySnapshot applySnapshot;
-    private Consumer<byte[]> applySnapshotProcedure;
+    private RaftActorSnapshotCohort snapshotCohort = NoopRaftActorSnapshotCohort.INSTANCE;
 
     /**
      * Constructs an instance.
@@ -84,8 +97,9 @@ public class SnapshotManager implements SnapshotState {
     }
 
     @Override
-    public void persist(final byte[] snapshotBytes, final long totalMemory) {
-        currentState.persist(snapshotBytes, totalMemory);
+    public void persist(final Snapshot.State state, final Optional<OutputStream> installSnapshotStream,
+            final long totalMemory) {
+        currentState.persist(state, installSnapshotStream, totalMemory);
     }
 
     @Override
@@ -103,12 +117,17 @@ public class SnapshotManager implements SnapshotState {
         return currentState.trimLog(desiredTrimIndex);
     }
 
-    public void setCreateSnapshotRunnable(Runnable createSnapshotProcedure) {
+    void setCreateSnapshotConsumer(Consumer<Optional<OutputStream>> createSnapshotProcedure) {
         this.createSnapshotProcedure = createSnapshotProcedure;
     }
 
-    public void setApplySnapshotConsumer(Consumer<byte[]> applySnapshotProcedure) {
-        this.applySnapshotProcedure = applySnapshotProcedure;
+    void setSnapshotCohort(final RaftActorSnapshotCohort snapshotCohort) {
+        this.snapshotCohort = snapshotCohort;
+    }
+
+    @Nonnull
+    public Snapshot.State convertSnapshot(ByteSource snapshotBytes) throws IOException {
+        return snapshotCohort.deserializeSnapshot(snapshotBytes);
     }
 
     public long getLastSequenceNumber() {
@@ -133,11 +152,9 @@ public class SnapshotManager implements SnapshotState {
      *
      * @param lastLogEntry the last log entry for the snapshot.
      * @param replicatedToAllIndex the index of the last entry replicated to all followers.
-     * @param installSnapshotInitiated true if snapshot is initiated to install on a follower.
      * @return a new CaptureSnapshot instance.
      */
-    public CaptureSnapshot newCaptureSnapshot(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex,
-            boolean installSnapshotInitiated) {
+    public CaptureSnapshot newCaptureSnapshot(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex) {
         TermInformationReader lastAppliedTermInfoReader =
                 lastAppliedTermInformationReader.init(context.getReplicatedLog(), context.getLastApplied(),
                         lastLogEntry, hasFollowers());
@@ -159,12 +176,12 @@ public class SnapshotManager implements SnapshotState {
             lastLogEntryIndex = lastLogEntry.getIndex();
             lastLogEntryTerm = lastLogEntry.getTerm();
         } else {
-            log.debug("{}: Capturing Snapshot : lastLogEntry is null. Using lastAppliedIndex {} and lastAppliedTerm {} instead.",
-                    persistenceId(), lastAppliedIndex, lastAppliedTerm);
+            log.debug("{}: Capturing Snapshot : lastLogEntry is null. Using lastAppliedIndex {} and "
+                    + "lastAppliedTerm {} instead.", persistenceId(), lastAppliedIndex, lastAppliedTerm);
         }
 
         return new CaptureSnapshot(lastLogEntryIndex, lastLogEntryTerm, lastAppliedIndex, lastAppliedTerm,
-                newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries, installSnapshotInitiated);
+                newReplicatedToAllIndex, newReplicatedToAllTerm, unAppliedEntries);
     }
 
     private class AbstractSnapshotState implements SnapshotState {
@@ -193,7 +210,8 @@ public class SnapshotManager implements SnapshotState {
         }
 
         @Override
-        public void persist(final byte[] snapshotBytes, final long totalMemory) {
+        public void persist(final Snapshot.State state, final Optional<OutputStream> installSnapshotStream,
+                final long totalMemory) {
             log.debug("persist should not be called in state {}", this);
         }
 
@@ -254,10 +272,13 @@ public class SnapshotManager implements SnapshotState {
             return false;
         }
 
+        @SuppressWarnings("checkstyle:IllegalCatch")
         private boolean capture(ReplicatedLogEntry lastLogEntry, long replicatedToAllIndex, String targetFollower) {
-            captureSnapshot = newCaptureSnapshot(lastLogEntry, replicatedToAllIndex, targetFollower != null);
+            captureSnapshot = newCaptureSnapshot(lastLogEntry, replicatedToAllIndex);
 
-            if (captureSnapshot.isInstallSnapshotInitiated()) {
+            OutputStream installSnapshotStream = null;
+            if (targetFollower != null) {
+                installSnapshotStream = context.newFileBackedOutputStream();
                 log.info("{}: Initiating snapshot capture {} to install on {}",
                         persistenceId(), captureSnapshot, targetFollower);
             } else {
@@ -271,7 +292,7 @@ public class SnapshotManager implements SnapshotState {
             SnapshotManager.this.currentState = CREATING;
 
             try {
-                createSnapshotProcedure.run();
+                createSnapshotProcedure.accept(Optional.ofNullable(installSnapshotStream));
             } catch (Exception e) {
                 SnapshotManager.this.currentState = IDLE;
                 log.error("Error creating snapshot", e);
@@ -319,11 +340,12 @@ public class SnapshotManager implements SnapshotState {
     private class Creating extends AbstractSnapshotState {
 
         @Override
-        public void persist(final byte[] snapshotBytes, final long totalMemory) {
+        public void persist(final Snapshot.State snapshotState, final Optional<OutputStream> installSnapshotStream,
+                final long totalMemory) {
             // create a snapshot object from the state provided and save it
             // when snapshot is saved async, SaveSnapshotSuccess is raised.
 
-            Snapshot snapshot = Snapshot.create(snapshotBytes,
+            Snapshot snapshot = Snapshot.create(snapshotState,
                     captureSnapshot.getUnAppliedEntries(),
                     captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
                     captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm(),
@@ -344,12 +366,12 @@ public class SnapshotManager implements SnapshotState {
             if (dataSizeThresholdExceeded || logSizeExceededSnapshotBatchCount) {
                 if (log.isDebugEnabled()) {
                     if (dataSizeThresholdExceeded) {
-                        log.debug("{}: log data size {} exceeds the memory threshold {} - doing snapshotPreCommit with index {}",
-                                context.getId(), context.getReplicatedLog().dataSize(), dataThreshold,
-                                captureSnapshot.getLastAppliedIndex());
+                        log.debug("{}: log data size {} exceeds the memory threshold {} - doing snapshotPreCommit "
+                                + "with index {}", context.getId(), context.getReplicatedLog().dataSize(),
+                                dataThreshold, captureSnapshot.getLastAppliedIndex());
                     } else {
-                        log.debug("{}: log size {} exceeds the snapshot batch count {} - doing snapshotPreCommit with index {}",
-                                context.getId(), context.getReplicatedLog().size(),
+                        log.debug("{}: log size {} exceeds the snapshot batch count {} - doing snapshotPreCommit with "
+                                + "index {}", context.getId(), context.getReplicatedLog().size(),
                                 context.getConfigParams().getSnapshotBatchCount(),
                                 captureSnapshot.getLastAppliedIndex());
                     }
@@ -387,10 +409,19 @@ public class SnapshotManager implements SnapshotState {
                     context.getId(), context.getReplicatedLog().getSnapshotIndex(),
                     context.getReplicatedLog().getSnapshotTerm());
 
-            if (context.getId().equals(currentBehavior.getLeaderId())
-                    && captureSnapshot.isInstallSnapshotInitiated()) {
-                // this would be call straight to the leader and won't initiate in serialization
-                currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot(snapshot));
+            if (installSnapshotStream.isPresent()) {
+                if (context.getId().equals(currentBehavior.getLeaderId())) {
+                    try {
+                        ByteSource snapshotBytes = ((FileBackedOutputStream)installSnapshotStream.get()).asByteSource();
+                        currentBehavior.handleMessage(context.getActor(),
+                                new SendInstallSnapshot(snapshot, snapshotBytes));
+                    } catch (IOException e) {
+                        log.error("{}: Snapshot install failed due to an unrecoverable streaming error",
+                                context.getId(), e);
+                    }
+                } else {
+                    ((FileBackedOutputStream)installSnapshotStream.get()).cleanup();
+                }
             }
 
             captureSnapshot = null;
@@ -407,6 +438,7 @@ public class SnapshotManager implements SnapshotState {
     private class Persisting extends AbstractSnapshotState {
 
         @Override
+        @SuppressWarnings("checkstyle:IllegalCatch")
         public void commit(final long sequenceNumber, long timeStamp) {
             log.debug("{}: Snapshot success -  sequence number: {}", persistenceId(), sequenceNumber);
 
@@ -424,8 +456,8 @@ public class SnapshotManager implements SnapshotState {
                         context.updatePeerIds(snapshot.getServerConfiguration());
                     }
 
-                    if (snapshot.getState().length > 0 ) {
-                        applySnapshotProcedure.accept(snapshot.getState());
+                    if (!(snapshot.getState() instanceof EmptyState)) {
+                        snapshotCohort.applySnapshot(snapshot.getState());
                     }
 
                     applySnapshot.getCallback().onSuccess();