Bug 8206: Prevent decr follower next index beyong -1
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeader.java
index 548b920fe771f183b904e2d5e2e65d7001cb746e..4382cfec3237075087837b5f1402cadab249188f 100644 (file)
@@ -14,8 +14,8 @@ import akka.actor.Cancellable;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import com.google.common.io.ByteSource;
-import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
@@ -227,9 +227,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             // snapshot. It's also possible that the follower's last log index is behind the leader's.
             // However in this case the log terms won't match and the logs will conflict - this is handled
             // elsewhere.
-            log.debug("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} - "
-                    + "forcing install snaphot", logName(), followerLogInformation.getId(),
-                    appendEntriesReply.getLogLastIndex(), context.getReplicatedLog().lastIndex());
+            log.info("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} "
+                    + "(snapshotIndex {}) - forcing install snaphot", logName(), followerLogInformation.getId(),
+                    appendEntriesReply.getLogLastIndex(), context.getReplicatedLog().lastIndex(),
+                    context.getReplicatedLog().getSnapshotIndex());
 
             followerLogInformation.setMatchIndex(-1);
             followerLogInformation.setNextIndex(-1);
@@ -251,7 +252,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 followerLogInformation.setNextIndex(followerLastLogIndex - 1);
                 updated = true;
 
-                log.debug("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the "
+                log.info("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the "
                         + "leader's {} - set the follower's next index to {}", logName(),
                         followerId, appendEntriesReply.getLogLastTerm(), appendEntriesReply.getLogLastIndex(),
                         followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex());
@@ -259,7 +260,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
             }
         } else {
-            log.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply);
+            log.info("{}: handleAppendEntriesReply - received unsuccessful reply: {}, leader snapshotIndex: {}",
+                    logName(), appendEntriesReply, context.getReplicatedLog().getSnapshotIndex());
 
             if (appendEntriesReply.isForceInstallSnapshot()) {
                 // Reset the followers match and next index. This is to signal that this follower has nothing
@@ -277,16 +279,22 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 // follower's last log index.
 
                 updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
+
+                log.info("{}: follower {} appears to be behind the leader from the last snapshot - "
+                    + "updated: matchIndex: {}, nextIndex: {}", logName(), followerId,
+                    appendEntriesReply.getLogLastTerm(), followerLogInformation.getMatchIndex(),
+                    followerLogInformation.getNextIndex());
             } else {
                 // The follower's log conflicts with leader's log so decrement follower's next index by 1
                 // in an attempt to find where the logs match.
 
-                followerLogInformation.decrNextIndex();
-                updated = true;
+                if (followerLogInformation.decrNextIndex()) {
+                    updated = true;
 
-                log.debug("{}: follower's last log term {} conflicts with the leader's {} - dec next index to {}",
-                        logName(), appendEntriesReply.getLogLastTerm(), followersLastLogTermInLeadersLog,
-                        followerLogInformation.getNextIndex());
+                    log.info("{}: follower {} last log term {} conflicts with the leader's {} - dec next index to {}",
+                            logName(), followerId, appendEntriesReply.getLogLastTerm(),
+                            followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex());
+                }
             }
         }
 
@@ -435,7 +443,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             // set currentTerm = T, convert to follower (ยง5.1)
             // This applies to all RPC messages and responses
             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
-                log.debug("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
+                log.info("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
                         logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
 
                 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
@@ -491,18 +499,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             if (reply.isSuccess()) {
                 if (installSnapshotState.isLastChunk(reply.getChunkIndex())) {
                     //this was the last chunk reply
-                    log.debug("{}: InstallSnapshotReply received, last chunk received, Chunk: {}. Follower: {} -"
-                            + " Setting nextIndex: {}", logName(), reply.getChunkIndex(), followerId,
-                            context.getReplicatedLog().getSnapshotIndex() + 1);
 
                     long followerMatchIndex = snapshotHolder.get().getLastIncludedIndex();
                     followerLogInformation.setMatchIndex(followerMatchIndex);
                     followerLogInformation.setNextIndex(followerMatchIndex + 1);
                     followerLogInformation.clearLeaderInstallSnapshotState();
 
-                    log.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}",
-                        logName(), followerId, followerLogInformation.getMatchIndex(),
-                        followerLogInformation.getNextIndex());
+                    log.info("{}: Snapshot successfully installed on follower {} (last chunk {}) - "
+                        + "matchIndex set to {}, nextIndex set to {}", logName(), followerId, reply.getChunkIndex(),
+                        followerLogInformation.getMatchIndex(), followerLogInformation.getNextIndex());
 
                     if (!anyFollowersInstallingSnapshot()) {
                         // once there are no pending followers receiving snapshots
@@ -521,8 +526,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     installSnapshotState.markSendStatus(true);
                 }
             } else {
-                log.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}",
-                        logName(), reply.getChunkIndex());
+                log.warn("{}: Received failed InstallSnapshotReply - will retry: {}", logName(), reply);
 
                 installSnapshotState.markSendStatus(false);
             }
@@ -653,17 +657,22 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     // if the follower is just not starting and if leader's index is more than followers index
                     // then snapshot should be sent
 
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("%s: InitiateInstallSnapshot to follower: %s, "
-                                    + "follower-nextIndex: %d, leader-snapshot-index: %d,  "
-                                    + "leader-last-index: %d", logName(), followerId,
-                                    followerNextIndex, leaderSnapShotIndex, leaderLastIndex));
-                    }
-
                     // Send heartbeat to follower whenever install snapshot is initiated.
                     sendAppendEntries = true;
                     if (canInstallSnapshot(followerNextIndex)) {
+                        log.info("{}: Initiating install snapshot to follower {}: follower nextIndex: {}, leader "
+                                + "snapshotIndex: {}, leader lastIndex: {}, leader log size: {}", logName(), followerId,
+                                followerNextIndex, leaderSnapShotIndex, leaderLastIndex,
+                                context.getReplicatedLog().size());
+
                         initiateCaptureSnapshot(followerId);
+                    } else {
+                        // It doesn't seem like we should ever reach here - most likely indicates sonething is
+                        // wrong.
+                        log.info("{}: Follower {} is behind but cannot install snapshot: follower nextIndex: {}, "
+                                + "leader snapshotIndex: {}, leader lastIndex: {}, leader log size: {}", logName(),
+                                followerId, followerNextIndex, leaderSnapShotIndex, leaderLastIndex,
+                                context.getReplicatedLog().size());
                     }
 
                 } else if (sendHeartbeat) {
@@ -789,35 +798,39 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 followerLogInfo.setLeaderInstallSnapshotState(installSnapshotState);
             }
 
-            // Ensure the snapshot bytes are set - this is a no-op.
-            installSnapshotState.setSnapshotBytes(snapshotHolder.get().getSnapshotBytes());
+            try {
+                // Ensure the snapshot bytes are set - this is a no-op.
+                installSnapshotState.setSnapshotBytes(snapshotHolder.get().getSnapshotBytes());
 
-            byte[] nextSnapshotChunk = installSnapshotState.getNextChunk();
+                byte[] nextSnapshotChunk = installSnapshotState.getNextChunk();
 
-            log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(),
-                    nextSnapshotChunk.length);
+                log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(),
+                        nextSnapshotChunk.length);
 
-            int nextChunkIndex = installSnapshotState.incrementChunkIndex();
-            Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
-            if (installSnapshotState.isLastChunk(nextChunkIndex)) {
-                serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
-            }
-
-            followerActor.tell(
-                new InstallSnapshot(currentTerm(), context.getId(),
-                    snapshotHolder.get().getLastIncludedIndex(),
-                    snapshotHolder.get().getLastIncludedTerm(),
-                    nextSnapshotChunk,
-                    nextChunkIndex,
-                    installSnapshotState.getTotalChunks(),
-                    Optional.of(installSnapshotState.getLastChunkHashCode()),
-                    serverConfig
-                ).toSerializable(followerLogInfo.getRaftVersion()),
-                actor()
-            );
+                int nextChunkIndex = installSnapshotState.incrementChunkIndex();
+                Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
+                if (installSnapshotState.isLastChunk(nextChunkIndex)) {
+                    serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
+                }
 
-            log.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", logName(), followerActor.path(),
-                    installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks());
+                followerActor.tell(
+                    new InstallSnapshot(currentTerm(), context.getId(),
+                        snapshotHolder.get().getLastIncludedIndex(),
+                        snapshotHolder.get().getLastIncludedTerm(),
+                        nextSnapshotChunk,
+                        nextChunkIndex,
+                        installSnapshotState.getTotalChunks(),
+                        Optional.of(installSnapshotState.getLastChunkHashCode()),
+                        serverConfig
+                    ).toSerializable(followerLogInfo.getRaftVersion()),
+                    actor()
+                );
+
+                log.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", logName(), followerActor.path(),
+                        installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks());
+            } catch (IOException e) {
+                throw Throwables.propagate(e);
+            }
         }
     }
 
@@ -912,16 +925,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     static class SnapshotHolder {
         private final long lastIncludedTerm;
         private final long lastIncludedIndex;
-        private final ByteString snapshotBytes;
+        private final ByteSource snapshotBytes;
 
         SnapshotHolder(Snapshot snapshot, ByteSource snapshotBytes) {
             this.lastIncludedTerm = snapshot.getLastAppliedTerm();
             this.lastIncludedIndex = snapshot.getLastAppliedIndex();
-            try {
-                this.snapshotBytes = ByteString.copyFrom(snapshotBytes.read());
-            } catch (IOException e) {
-                throw new RuntimeException("Error reading state", e);
-            }
+            this.snapshotBytes = snapshotBytes;
         }
 
         long getLastIncludedTerm() {
@@ -932,7 +941,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             return lastIncludedIndex;
         }
 
-        ByteString getSnapshotBytes() {
+        ByteSource getSnapshotBytes() {
             return snapshotBytes;
         }
     }