Bug 8206: Prevent decr follower next index beyong -1
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeader.java
index fab1714989a99407f3d51452d33ef313ac2c14e8..4382cfec3237075087837b5f1402cadab249188f 100644 (file)
@@ -14,7 +14,9 @@ import akka.actor.Cancellable;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.protobuf.ByteString;
+import com.google.common.base.Throwables;
+import com.google.common.io.ByteSource;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -33,8 +35,8 @@ import org.opendaylight.controller.cluster.raft.PeerInfo;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.VotingState;
+import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot;
@@ -46,11 +48,13 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
  * The behavior of a RaftActor when it is in the Leader state
- * <p/>
+ *
+ * <p>
  * Leaders:
  * <ul>
  * <li> Upon election: send initial empty AppendEntries RPCs
@@ -64,7 +68,7 @@ import scala.concurrent.duration.FiniteDuration;
  * follower (§5.3)
  * <li> If AppendEntries fails because of log inconsistency:
  * decrement nextIndex and retry (§5.3)
- * <li> If there exists an N such that N > commitIndex, a majority
+ * <li> If there exists an N such that N &gt; commitIndex, a majority
  * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
  * set commitIndex = N (§5.3, §5.4).
  * </ul>
@@ -81,7 +85,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private final Queue<ClientRequestTracker> trackers = new LinkedList<>();
 
     private Cancellable heartbeatSchedule = null;
-    private Optional<SnapshotHolder> snapshot = Optional.absent();
+    private Optional<SnapshotHolder> snapshotHolder = Optional.absent();
     private int minReplicationCount;
 
     protected AbstractLeader(RaftActorContext context, RaftState state,
@@ -90,7 +94,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         if (initializeFromLeader != null) {
             followerToLog.putAll(initializeFromLeader.followerToLog);
-            snapshot = initializeFromLeader.snapshot;
+            snapshotHolder = initializeFromLeader.snapshotHolder;
             trackers.addAll(initializeFromLeader.trackers);
         } else {
             for (PeerInfo peerInfo: context.getPeers()) {
@@ -163,17 +167,13 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     }
 
     @VisibleForTesting
-    void setSnapshot(@Nullable Snapshot snapshot) {
-        if (snapshot != null) {
-            this.snapshot = Optional.of(new SnapshotHolder(snapshot));
-        } else {
-            this.snapshot = Optional.absent();
-        }
+    void setSnapshot(@Nullable SnapshotHolder snapshotHolder) {
+        this.snapshotHolder = Optional.fromNullable(snapshotHolder);
     }
 
     @VisibleForTesting
     boolean hasSnapshot() {
-        return snapshot.isPresent();
+        return snapshotHolder.isPresent();
     }
 
     @Override
@@ -227,9 +227,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             // snapshot. It's also possible that the follower's last log index is behind the leader's.
             // However in this case the log terms won't match and the logs will conflict - this is handled
             // elsewhere.
-            log.debug("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} - "
-                    + "forcing install snaphot", logName(), followerLogInformation.getId(),
-                    appendEntriesReply.getLogLastIndex(), context.getReplicatedLog().lastIndex());
+            log.info("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} "
+                    + "(snapshotIndex {}) - forcing install snaphot", logName(), followerLogInformation.getId(),
+                    appendEntriesReply.getLogLastIndex(), context.getReplicatedLog().lastIndex(),
+                    context.getReplicatedLog().getSnapshotIndex());
 
             followerLogInformation.setMatchIndex(-1);
             followerLogInformation.setNextIndex(-1);
@@ -251,7 +252,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 followerLogInformation.setNextIndex(followerLastLogIndex - 1);
                 updated = true;
 
-                log.debug("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the "
+                log.info("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the "
                         + "leader's {} - set the follower's next index to {}", logName(),
                         followerId, appendEntriesReply.getLogLastTerm(), appendEntriesReply.getLogLastIndex(),
                         followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex());
@@ -259,7 +260,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
             }
         } else {
-            log.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply);
+            log.info("{}: handleAppendEntriesReply - received unsuccessful reply: {}, leader snapshotIndex: {}",
+                    logName(), appendEntriesReply, context.getReplicatedLog().getSnapshotIndex());
 
             if (appendEntriesReply.isForceInstallSnapshot()) {
                 // Reset the followers match and next index. This is to signal that this follower has nothing
@@ -277,30 +279,61 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 // follower's last log index.
 
                 updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
+
+                log.info("{}: follower {} appears to be behind the leader from the last snapshot - "
+                    + "updated: matchIndex: {}, nextIndex: {}", logName(), followerId,
+                    appendEntriesReply.getLogLastTerm(), followerLogInformation.getMatchIndex(),
+                    followerLogInformation.getNextIndex());
             } else {
                 // The follower's log conflicts with leader's log so decrement follower's next index by 1
                 // in an attempt to find where the logs match.
 
-                followerLogInformation.decrNextIndex();
-                updated = true;
+                if (followerLogInformation.decrNextIndex()) {
+                    updated = true;
 
-                log.debug("{}: follower's last log term {} conflicts with the leader's {} - dec next index to {}",
-                        logName(), appendEntriesReply.getLogLastTerm(), followersLastLogTermInLeadersLog,
-                        followerLogInformation.getNextIndex());
+                    log.info("{}: follower {} last log term {} conflicts with the leader's {} - dec next index to {}",
+                            logName(), followerId, appendEntriesReply.getLogLastTerm(),
+                            followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex());
+                }
             }
         }
 
-        // Now figure out if this reply warrants a change in the commitIndex
-        // If there exists an N such that N > commitIndex, a majority
-        // of matchIndex[i] ≥ N, and log[N].term == currentTerm:
-        // set commitIndex = N (§5.3, §5.4).
         if (log.isTraceEnabled()) {
             log.trace("{}: handleAppendEntriesReply from {}: commitIndex: {}, lastAppliedIndex: {}, currentTerm: {}",
                     logName(), followerId, context.getCommitIndex(), context.getLastApplied(), currentTerm());
         }
 
+        possiblyUpdateCommitIndex();
+
+        //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event
+        sendUpdatesToFollower(followerId, followerLogInformation, false, !updated);
+
+        return this;
+    }
+
+    private void possiblyUpdateCommitIndex() {
+        // Figure out if we can update the the commitIndex as follows:
+        //   If there exists an index N such that N > commitIndex, a majority of matchIndex[i] ≥ N,
+        //     and log[N].term == currentTerm:
+        //   set commitIndex = N (§5.3, §5.4).
         for (long index = context.getCommitIndex() + 1; ; index++) {
-            int replicatedCount = 1;
+            ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(index);
+            if (replicatedLogEntry == null) {
+                log.trace("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}",
+                        logName(), index, context.getReplicatedLog().getSnapshotIndex(),
+                        context.getReplicatedLog().size());
+                break;
+            }
+
+            // Count our entry if it has been persisted.
+            int replicatedCount = replicatedLogEntry.isPersistencePending() ? 0 : 1;
+
+            if (replicatedCount == 0) {
+                // We don't commit and apply a log entry until we've gotten the ack from our local persistence,
+                // even though there *shouldn't* be any issue with updating the commit index if we get a consensus
+                // amongst the followers w/o the local persistence ack.
+                break;
+            }
 
             log.trace("{}: checking Nth index {}", logName(), index);
             for (FollowerLogInformation info : followerToLog.values()) {
@@ -319,14 +352,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             }
 
             if (replicatedCount >= minReplicationCount) {
-                ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(index);
-                if (replicatedLogEntry == null) {
-                    log.debug("{}: ReplicatedLogEntry not found for index {} - snapshotIndex: {}, journal size: {}",
-                            logName(), index, context.getReplicatedLog().getSnapshotIndex(),
-                            context.getReplicatedLog().size());
-                    break;
-                }
-
                 // Don't update the commit index if the log entry is from a previous term, as per §5.4.1:
                 // "Raft never commits log entries from previous terms by counting replicas".
                 // However we keep looping so we can make progress when new entries in the current term
@@ -348,11 +373,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         // Apply the change to the state machine
         if (context.getCommitIndex() > context.getLastApplied()) {
-            if (log.isDebugEnabled()) {
-                log.debug(
-                    "{}: handleAppendEntriesReply from {}: applying to log - commitIndex: {}, lastAppliedIndex: {}",
-                    logName(), followerId, context.getCommitIndex(), context.getLastApplied());
-            }
+            log.debug("{}: Applying to log - commitIndex: {}, lastAppliedIndex: {}", logName(),
+                    context.getCommitIndex(), context.getLastApplied());
 
             applyLogToStateMachine(context.getCommitIndex());
         }
@@ -360,11 +382,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         if (!context.getSnapshotManager().isCapturing()) {
             purgeInMemoryLog();
         }
-
-        //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event
-        sendUpdatesToFollower(followerId, followerLogInformation, false, !updated);
-
-        return this;
     }
 
     private boolean updateFollowerLogInformation(FollowerLogInformation followerLogInformation,
@@ -426,7 +443,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             // set currentTerm = T, convert to follower (§5.1)
             // This applies to all RPC messages and responses
             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
-                log.debug("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
+                log.info("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
                         logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
 
                 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
@@ -440,13 +457,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             sendHeartBeat();
             scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
         } else if (message instanceof SendInstallSnapshot) {
-            // received from RaftActor
-            setSnapshot(((SendInstallSnapshot) message).getSnapshot());
+            SendInstallSnapshot sendInstallSnapshot = (SendInstallSnapshot) message;
+            setSnapshot(new SnapshotHolder(sendInstallSnapshot.getSnapshot(), sendInstallSnapshot.getSnapshotBytes()));
             sendInstallSnapshot();
         } else if (message instanceof Replicate) {
             replicate((Replicate) message);
         } else if (message instanceof InstallSnapshotReply) {
             handleInstallSnapshotReply((InstallSnapshotReply) message);
+        } else if (message instanceof CheckConsensusReached) {
+            possiblyUpdateCommitIndex();
         } else {
             return super.handleMessage(sender, message);
         }
@@ -480,18 +499,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             if (reply.isSuccess()) {
                 if (installSnapshotState.isLastChunk(reply.getChunkIndex())) {
                     //this was the last chunk reply
-                    log.debug("{}: InstallSnapshotReply received, last chunk received, Chunk: {}. Follower: {} -"
-                            + " Setting nextIndex: {}", logName(), reply.getChunkIndex(), followerId,
-                            context.getReplicatedLog().getSnapshotIndex() + 1);
 
-                    long followerMatchIndex = snapshot.get().getLastIncludedIndex();
+                    long followerMatchIndex = snapshotHolder.get().getLastIncludedIndex();
                     followerLogInformation.setMatchIndex(followerMatchIndex);
                     followerLogInformation.setNextIndex(followerMatchIndex + 1);
                     followerLogInformation.clearLeaderInstallSnapshotState();
 
-                    log.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}",
-                        logName(), followerId, followerLogInformation.getMatchIndex(),
-                        followerLogInformation.getNextIndex());
+                    log.info("{}: Snapshot successfully installed on follower {} (last chunk {}) - "
+                        + "matchIndex set to {}, nextIndex set to {}", logName(), followerId, reply.getChunkIndex(),
+                        followerLogInformation.getMatchIndex(), followerLogInformation.getNextIndex());
 
                     if (!anyFollowersInstallingSnapshot()) {
                         // once there are no pending followers receiving snapshots
@@ -510,8 +526,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     installSnapshotState.markSendStatus(true);
                 }
             } else {
-                log.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}",
-                        logName(), reply.getChunkIndex());
+                log.warn("{}: Received failed InstallSnapshotReply - will retry: {}", logName(), reply);
 
                 installSnapshotState.markSendStatus(false);
             }
@@ -553,8 +568,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private void replicate(Replicate replicate) {
         long logIndex = replicate.getReplicatedLogEntry().getIndex();
 
-        log.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}", logName(),
-                replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass());
+        log.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}, isSendImmediate: {}", logName(),
+                replicate.getIdentifier(), logIndex, replicate.getReplicatedLogEntry().getData().getClass(),
+                replicate.isSendImmediate());
 
         // Create a tracker entry we will use this later to notify the
         // client actor
@@ -571,7 +587,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             applyLogToStateMachine(logIndex);
         }
 
-        if (!followerToLog.isEmpty()) {
+        if (replicate.isSendImmediate() && !followerToLog.isEmpty()) {
             sendAppendEntries(0, false);
         }
     }
@@ -592,8 +608,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     /**
      * This method checks if any update needs to be sent to the given follower. This includes append log entries,
      * sending next snapshot chunk, and initiating a snapshot.
-     *
-     * @return true if any update is sent, false otherwise
      */
     private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation,
                                        boolean sendHeartbeat, boolean isHeartbeat) {
@@ -643,17 +657,22 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     // if the follower is just not starting and if leader's index is more than followers index
                     // then snapshot should be sent
 
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("%s: InitiateInstallSnapshot to follower: %s, "
-                                    + "follower-nextIndex: %d, leader-snapshot-index: %d,  "
-                                    + "leader-last-index: %d", logName(), followerId,
-                                    followerNextIndex, leaderSnapShotIndex, leaderLastIndex));
-                    }
-
                     // Send heartbeat to follower whenever install snapshot is initiated.
                     sendAppendEntries = true;
                     if (canInstallSnapshot(followerNextIndex)) {
+                        log.info("{}: Initiating install snapshot to follower {}: follower nextIndex: {}, leader "
+                                + "snapshotIndex: {}, leader lastIndex: {}, leader log size: {}", logName(), followerId,
+                                followerNextIndex, leaderSnapShotIndex, leaderLastIndex,
+                                context.getReplicatedLog().size());
+
                         initiateCaptureSnapshot(followerId);
+                    } else {
+                        // It doesn't seem like we should ever reach here - most likely indicates sonething is
+                        // wrong.
+                        log.info("{}: Follower {} is behind but cannot install snapshot: follower nextIndex: {}, "
+                                + "leader snapshotIndex: {}, leader lastIndex: {}, leader log size: {}", logName(),
+                                followerId, followerNextIndex, leaderSnapShotIndex, leaderLastIndex,
+                                context.getReplicatedLog().size());
                     }
 
                 } else if (sendHeartbeat) {
@@ -699,7 +718,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     /**
      * Initiates a snapshot capture to install on a follower.
-     * <p/>
+     *
+     * <p>
      * Install Snapshot works as follows
      *   1. Leader initiates the capture snapshot by calling createSnapshot on the RaftActor.
      *   2. On receipt of the CaptureSnapshotReply message, the RaftActor persists the snapshot and makes a call to
@@ -717,7 +737,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      */
     public boolean initiateCaptureSnapshot(String followerId) {
         FollowerLogInformation followerLogInfo = followerToLog.get(followerId);
-        if (snapshot.isPresent()) {
+        if (snapshotHolder.isPresent()) {
             // If a snapshot is present in the memory, most likely another install is in progress no need to capture
             // snapshot. This could happen if another follower needs an install when one is going on.
             final ActorSelection followerActor = context.getPeerActorSelection(followerId);
@@ -725,16 +745,16 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             // Note: sendSnapshotChunk will set the LeaderInstallSnapshotState.
             sendSnapshotChunk(followerActor, followerLogInfo);
             return true;
-        } else {
-            boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
-                    this.getReplicatedToAllIndex(), followerId);
-            if (captureInitiated) {
-                followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState(
-                        context.getConfigParams().getSnapshotChunkSize(), logName()));
-            }
+        }
 
-            return captureInitiated;
+        boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
+            this.getReplicatedToAllIndex(), followerId);
+        if (captureInitiated) {
+            followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState(
+                context.getConfigParams().getSnapshotChunkSize(), logName()));
         }
+
+        return captureInitiated;
     }
 
     private boolean canInstallSnapshot(long nextIndex) {
@@ -770,7 +790,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      *  InstallSnapshot should qualify as a heartbeat too.
      */
     private void sendSnapshotChunk(ActorSelection followerActor, FollowerLogInformation followerLogInfo) {
-        if (snapshot.isPresent()) {
+        if (snapshotHolder.isPresent()) {
             LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState();
             if (installSnapshotState == null) {
                 installSnapshotState = new LeaderInstallSnapshotState(context.getConfigParams().getSnapshotChunkSize(),
@@ -778,35 +798,39 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 followerLogInfo.setLeaderInstallSnapshotState(installSnapshotState);
             }
 
-            // Ensure the snapshot bytes are set - this is a no-op.
-            installSnapshotState.setSnapshotBytes(snapshot.get().getSnapshotBytes());
-
-            byte[] nextSnapshotChunk = installSnapshotState.getNextChunk();
+            try {
+                // Ensure the snapshot bytes are set - this is a no-op.
+                installSnapshotState.setSnapshotBytes(snapshotHolder.get().getSnapshotBytes());
 
-            log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(),
-                    nextSnapshotChunk.length);
+                byte[] nextSnapshotChunk = installSnapshotState.getNextChunk();
 
-            int nextChunkIndex = installSnapshotState.incrementChunkIndex();
-            Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
-            if (installSnapshotState.isLastChunk(nextChunkIndex)) {
-                serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
-            }
+                log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(),
+                        nextSnapshotChunk.length);
 
-            followerActor.tell(
-                new InstallSnapshot(currentTerm(), context.getId(),
-                    snapshot.get().getLastIncludedIndex(),
-                    snapshot.get().getLastIncludedTerm(),
-                    nextSnapshotChunk,
-                    nextChunkIndex,
-                    installSnapshotState.getTotalChunks(),
-                    Optional.of(installSnapshotState.getLastChunkHashCode()),
-                    serverConfig
-                ).toSerializable(followerLogInfo.getRaftVersion()),
-                actor()
-            );
+                int nextChunkIndex = installSnapshotState.incrementChunkIndex();
+                Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
+                if (installSnapshotState.isLastChunk(nextChunkIndex)) {
+                    serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
+                }
 
-            log.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", logName(), followerActor.path(),
-                    installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks());
+                followerActor.tell(
+                    new InstallSnapshot(currentTerm(), context.getId(),
+                        snapshotHolder.get().getLastIncludedIndex(),
+                        snapshotHolder.get().getLastIncludedTerm(),
+                        nextSnapshotChunk,
+                        nextChunkIndex,
+                        installSnapshotState.getTotalChunks(),
+                        Optional.of(installSnapshotState.getLastChunkHashCode()),
+                        serverConfig
+                    ).toSerializable(followerLogInfo.getRaftVersion()),
+                    actor()
+                );
+
+                log.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", logName(), followerActor.path(),
+                        installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks());
+            } catch (IOException e) {
+                throw Throwables.propagate(e);
+            }
         }
     }
 
@@ -898,15 +922,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         return followerToLog.size();
     }
 
-    private static class SnapshotHolder {
+    static class SnapshotHolder {
         private final long lastIncludedTerm;
         private final long lastIncludedIndex;
-        private final ByteString snapshotBytes;
+        private final ByteSource snapshotBytes;
 
-        SnapshotHolder(Snapshot snapshot) {
+        SnapshotHolder(Snapshot snapshot, ByteSource snapshotBytes) {
             this.lastIncludedTerm = snapshot.getLastAppliedTerm();
             this.lastIncludedIndex = snapshot.getLastAppliedIndex();
-            this.snapshotBytes = ByteString.copyFrom(snapshot.getState());
+            this.snapshotBytes = snapshotBytes;
         }
 
         long getLastIncludedTerm() {
@@ -917,7 +941,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             return lastIncludedIndex;
         }
 
-        ByteString getSnapshotBytes() {
+        ByteSource getSnapshotBytes() {
             return snapshotBytes;
         }
     }