Bug 7521: Add FileBackedOutputStream and use for snapshot chunking
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / SnapshotManager.java
index 12508aebffedc10195ad6c3bd482a1905c23fa7a..71ef8ffcea04360b6db573de04d5344d44b3cf7c 100644 (file)
@@ -11,13 +11,13 @@ package org.opendaylight.controller.cluster.raft;
 import akka.persistence.SnapshotSelectionCriteria;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.io.ByteSource;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.List;
 import java.util.Optional;
 import java.util.function.Consumer;
 import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
@@ -278,7 +278,7 @@ public class SnapshotManager implements SnapshotState {
 
             OutputStream installSnapshotStream = null;
             if (targetFollower != null) {
-                installSnapshotStream = new ByteArrayOutputStream();
+                installSnapshotStream = context.newFileBackedOutputStream();
                 log.info("{}: Initiating snapshot capture {} to install on {}",
                         persistenceId(), captureSnapshot, targetFollower);
             } else {
@@ -410,18 +410,17 @@ public class SnapshotManager implements SnapshotState {
                     context.getReplicatedLog().getSnapshotTerm());
 
             if (installSnapshotStream.isPresent()) {
-                try {
-                    installSnapshotStream.get().close();
-                } catch (IOException e) {
-                    log.warn("Error closing install snapshot OutputStream", e);
-                }
-
                 if (context.getId().equals(currentBehavior.getLeaderId())) {
-                    ByteSource snapshotBytes = ByteSource.wrap(((ByteArrayOutputStream)installSnapshotStream.get())
-                            .toByteArray());
-
-                    // this would be call straight to the leader and won't initiate in serialization
-                    currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot(snapshot, snapshotBytes));
+                    try {
+                        ByteSource snapshotBytes = ((FileBackedOutputStream)installSnapshotStream.get()).asByteSource();
+                        currentBehavior.handleMessage(context.getActor(),
+                                new SendInstallSnapshot(snapshot, snapshotBytes));
+                    } catch (IOException e) {
+                        log.error("{}: Snapshot install failed due to an unrecoverable streaming error",
+                                context.getId(), e);
+                    }
+                } else {
+                    ((FileBackedOutputStream)installSnapshotStream.get()).cleanup();
                 }
             }