import akka.persistence.SnapshotSelectionCriteria;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.ByteSource;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
OutputStream installSnapshotStream = null;
if (targetFollower != null) {
- installSnapshotStream = new ByteArrayOutputStream();
+ installSnapshotStream = context.newFileBackedOutputStream();
log.info("{}: Initiating snapshot capture {} to install on {}",
persistenceId(), captureSnapshot, targetFollower);
} else {
context.getReplicatedLog().getSnapshotTerm());
if (installSnapshotStream.isPresent()) {
- try {
- installSnapshotStream.get().close();
- } catch (IOException e) {
- log.warn("Error closing install snapshot OutputStream", e);
- }
-
if (context.getId().equals(currentBehavior.getLeaderId())) {
- ByteSource snapshotBytes = ByteSource.wrap(((ByteArrayOutputStream)installSnapshotStream.get())
- .toByteArray());
-
- // this would be call straight to the leader and won't initiate in serialization
- currentBehavior.handleMessage(context.getActor(), new SendInstallSnapshot(snapshot, snapshotBytes));
+ try {
+ ByteSource snapshotBytes = ((FileBackedOutputStream)installSnapshotStream.get()).asByteSource();
+ currentBehavior.handleMessage(context.getActor(),
+ new SendInstallSnapshot(snapshot, snapshotBytes));
+ } catch (IOException e) {
+ log.error("{}: Snapshot install failed due to an unrecoverable streaming error",
+ context.getId(), e);
+ }
+ } else {
+ ((FileBackedOutputStream)installSnapshotStream.get()).cleanup();
}
}