Bug 7521: Convert install snapshot chunking to use streams
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeader.java
index 548b920fe771f183b904e2d5e2e65d7001cb746e..12cbcc0df66aa635315436ab04a4cd5e00bfa5cd 100644 (file)
@@ -14,8 +14,8 @@ import akka.actor.Cancellable;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import com.google.common.io.ByteSource;
 import com.google.common.io.ByteSource;
-import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
@@ -789,35 +789,39 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 followerLogInfo.setLeaderInstallSnapshotState(installSnapshotState);
             }
 
                 followerLogInfo.setLeaderInstallSnapshotState(installSnapshotState);
             }
 
-            // Ensure the snapshot bytes are set - this is a no-op.
-            installSnapshotState.setSnapshotBytes(snapshotHolder.get().getSnapshotBytes());
-
-            byte[] nextSnapshotChunk = installSnapshotState.getNextChunk();
+            try {
+                // Ensure the snapshot bytes are set - this is a no-op.
+                installSnapshotState.setSnapshotBytes(snapshotHolder.get().getSnapshotBytes());
 
 
-            log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(),
-                    nextSnapshotChunk.length);
+                byte[] nextSnapshotChunk = installSnapshotState.getNextChunk();
 
 
-            int nextChunkIndex = installSnapshotState.incrementChunkIndex();
-            Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
-            if (installSnapshotState.isLastChunk(nextChunkIndex)) {
-                serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
-            }
+                log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(),
+                        nextSnapshotChunk.length);
 
 
-            followerActor.tell(
-                new InstallSnapshot(currentTerm(), context.getId(),
-                    snapshotHolder.get().getLastIncludedIndex(),
-                    snapshotHolder.get().getLastIncludedTerm(),
-                    nextSnapshotChunk,
-                    nextChunkIndex,
-                    installSnapshotState.getTotalChunks(),
-                    Optional.of(installSnapshotState.getLastChunkHashCode()),
-                    serverConfig
-                ).toSerializable(followerLogInfo.getRaftVersion()),
-                actor()
-            );
+                int nextChunkIndex = installSnapshotState.incrementChunkIndex();
+                Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
+                if (installSnapshotState.isLastChunk(nextChunkIndex)) {
+                    serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
+                }
 
 
-            log.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", logName(), followerActor.path(),
-                    installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks());
+                followerActor.tell(
+                    new InstallSnapshot(currentTerm(), context.getId(),
+                        snapshotHolder.get().getLastIncludedIndex(),
+                        snapshotHolder.get().getLastIncludedTerm(),
+                        nextSnapshotChunk,
+                        nextChunkIndex,
+                        installSnapshotState.getTotalChunks(),
+                        Optional.of(installSnapshotState.getLastChunkHashCode()),
+                        serverConfig
+                    ).toSerializable(followerLogInfo.getRaftVersion()),
+                    actor()
+                );
+
+                log.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", logName(), followerActor.path(),
+                        installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks());
+            } catch (IOException e) {
+                throw Throwables.propagate(e);
+            }
         }
     }
 
         }
     }
 
@@ -912,16 +916,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     static class SnapshotHolder {
         private final long lastIncludedTerm;
         private final long lastIncludedIndex;
     static class SnapshotHolder {
         private final long lastIncludedTerm;
         private final long lastIncludedIndex;
-        private final ByteString snapshotBytes;
+        private final ByteSource snapshotBytes;
 
         SnapshotHolder(Snapshot snapshot, ByteSource snapshotBytes) {
             this.lastIncludedTerm = snapshot.getLastAppliedTerm();
             this.lastIncludedIndex = snapshot.getLastAppliedIndex();
 
         SnapshotHolder(Snapshot snapshot, ByteSource snapshotBytes) {
             this.lastIncludedTerm = snapshot.getLastAppliedTerm();
             this.lastIncludedIndex = snapshot.getLastAppliedIndex();
-            try {
-                this.snapshotBytes = ByteString.copyFrom(snapshotBytes.read());
-            } catch (IOException e) {
-                throw new RuntimeException("Error reading state", e);
-            }
+            this.snapshotBytes = snapshotBytes;
         }
 
         long getLastIncludedTerm() {
         }
 
         long getLastIncludedTerm() {
@@ -932,7 +932,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             return lastIncludedIndex;
         }
 
             return lastIncludedIndex;
         }
 
-        ByteString getSnapshotBytes() {
+        ByteSource getSnapshotBytes() {
             return snapshotBytes;
         }
     }
             return snapshotBytes;
         }
     }