Bug 8206: Prevent decr follower next index beyong -1
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeader.java
index 4fae48e1aa42143d0867338d1ca3a3536ea9b26f..4382cfec3237075087837b5f1402cadab249188f 100644 (file)
@@ -14,7 +14,9 @@ import akka.actor.Cancellable;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.protobuf.ByteString;
+import com.google.common.base.Throwables;
+import com.google.common.io.ByteSource;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -33,7 +35,6 @@ import org.opendaylight.controller.cluster.raft.PeerInfo;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
-import org.opendaylight.controller.cluster.raft.Snapshot;
 import org.opendaylight.controller.cluster.raft.VotingState;
 import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
@@ -47,6 +48,7 @@ import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -83,7 +85,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private final Queue<ClientRequestTracker> trackers = new LinkedList<>();
 
     private Cancellable heartbeatSchedule = null;
-    private Optional<SnapshotHolder> snapshot = Optional.absent();
+    private Optional<SnapshotHolder> snapshotHolder = Optional.absent();
     private int minReplicationCount;
 
     protected AbstractLeader(RaftActorContext context, RaftState state,
@@ -92,7 +94,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         if (initializeFromLeader != null) {
             followerToLog.putAll(initializeFromLeader.followerToLog);
-            snapshot = initializeFromLeader.snapshot;
+            snapshotHolder = initializeFromLeader.snapshotHolder;
             trackers.addAll(initializeFromLeader.trackers);
         } else {
             for (PeerInfo peerInfo: context.getPeers()) {
@@ -165,17 +167,13 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     }
 
     @VisibleForTesting
-    void setSnapshot(@Nullable Snapshot snapshot) {
-        if (snapshot != null) {
-            this.snapshot = Optional.of(new SnapshotHolder(snapshot));
-        } else {
-            this.snapshot = Optional.absent();
-        }
+    void setSnapshot(@Nullable SnapshotHolder snapshotHolder) {
+        this.snapshotHolder = Optional.fromNullable(snapshotHolder);
     }
 
     @VisibleForTesting
     boolean hasSnapshot() {
-        return snapshot.isPresent();
+        return snapshotHolder.isPresent();
     }
 
     @Override
@@ -229,9 +227,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             // snapshot. It's also possible that the follower's last log index is behind the leader's.
             // However in this case the log terms won't match and the logs will conflict - this is handled
             // elsewhere.
-            log.debug("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} - "
-                    + "forcing install snaphot", logName(), followerLogInformation.getId(),
-                    appendEntriesReply.getLogLastIndex(), context.getReplicatedLog().lastIndex());
+            log.info("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} "
+                    + "(snapshotIndex {}) - forcing install snaphot", logName(), followerLogInformation.getId(),
+                    appendEntriesReply.getLogLastIndex(), context.getReplicatedLog().lastIndex(),
+                    context.getReplicatedLog().getSnapshotIndex());
 
             followerLogInformation.setMatchIndex(-1);
             followerLogInformation.setNextIndex(-1);
@@ -253,7 +252,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 followerLogInformation.setNextIndex(followerLastLogIndex - 1);
                 updated = true;
 
-                log.debug("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the "
+                log.info("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the "
                         + "leader's {} - set the follower's next index to {}", logName(),
                         followerId, appendEntriesReply.getLogLastTerm(), appendEntriesReply.getLogLastIndex(),
                         followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex());
@@ -261,7 +260,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
             }
         } else {
-            log.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply);
+            log.info("{}: handleAppendEntriesReply - received unsuccessful reply: {}, leader snapshotIndex: {}",
+                    logName(), appendEntriesReply, context.getReplicatedLog().getSnapshotIndex());
 
             if (appendEntriesReply.isForceInstallSnapshot()) {
                 // Reset the followers match and next index. This is to signal that this follower has nothing
@@ -279,16 +279,22 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 // follower's last log index.
 
                 updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
+
+                log.info("{}: follower {} appears to be behind the leader from the last snapshot - "
+                    + "updated: matchIndex: {}, nextIndex: {}", logName(), followerId,
+                    appendEntriesReply.getLogLastTerm(), followerLogInformation.getMatchIndex(),
+                    followerLogInformation.getNextIndex());
             } else {
                 // The follower's log conflicts with leader's log so decrement follower's next index by 1
                 // in an attempt to find where the logs match.
 
-                followerLogInformation.decrNextIndex();
-                updated = true;
+                if (followerLogInformation.decrNextIndex()) {
+                    updated = true;
 
-                log.debug("{}: follower's last log term {} conflicts with the leader's {} - dec next index to {}",
-                        logName(), appendEntriesReply.getLogLastTerm(), followersLastLogTermInLeadersLog,
-                        followerLogInformation.getNextIndex());
+                    log.info("{}: follower {} last log term {} conflicts with the leader's {} - dec next index to {}",
+                            logName(), followerId, appendEntriesReply.getLogLastTerm(),
+                            followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex());
+                }
             }
         }
 
@@ -323,14 +329,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             int replicatedCount = replicatedLogEntry.isPersistencePending() ? 0 : 1;
 
             if (replicatedCount == 0) {
-                // We don't commit and apply a log entry until we've gotten the ack from our local persistence. Ideally
-                // we should be able to update the commit index if we get a consensus amongst the followers
-                // w/o the local persistence ack however this can cause timing issues with snapshot capture
-                // which may lead to an entry that is neither in the serialized snapshot state nor in the snapshot's
-                // unapplied entries. This can happen if the lastAppliedIndex is updated but the corresponding
-                // ApplyState message is still pending in the message queue and thus the corresponding log entry hasn't
-                // actually been applied to the state yet. This would be alleviated by eliminating the ApplyState
-                // message in lieu of synchronously updating lastAppliedIndex and applying to state.
+                // We don't commit and apply a log entry until we've gotten the ack from our local persistence,
+                // even though there *shouldn't* be any issue with updating the commit index if we get a consensus
+                // amongst the followers w/o the local persistence ack.
                 break;
             }
 
@@ -442,7 +443,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             // set currentTerm = T, convert to follower (ยง5.1)
             // This applies to all RPC messages and responses
             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
-                log.debug("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
+                log.info("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
                         logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
 
                 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
@@ -456,8 +457,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             sendHeartBeat();
             scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
         } else if (message instanceof SendInstallSnapshot) {
-            // received from RaftActor
-            setSnapshot(((SendInstallSnapshot) message).getSnapshot());
+            SendInstallSnapshot sendInstallSnapshot = (SendInstallSnapshot) message;
+            setSnapshot(new SnapshotHolder(sendInstallSnapshot.getSnapshot(), sendInstallSnapshot.getSnapshotBytes()));
             sendInstallSnapshot();
         } else if (message instanceof Replicate) {
             replicate((Replicate) message);
@@ -498,18 +499,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             if (reply.isSuccess()) {
                 if (installSnapshotState.isLastChunk(reply.getChunkIndex())) {
                     //this was the last chunk reply
-                    log.debug("{}: InstallSnapshotReply received, last chunk received, Chunk: {}. Follower: {} -"
-                            + " Setting nextIndex: {}", logName(), reply.getChunkIndex(), followerId,
-                            context.getReplicatedLog().getSnapshotIndex() + 1);
 
-                    long followerMatchIndex = snapshot.get().getLastIncludedIndex();
+                    long followerMatchIndex = snapshotHolder.get().getLastIncludedIndex();
                     followerLogInformation.setMatchIndex(followerMatchIndex);
                     followerLogInformation.setNextIndex(followerMatchIndex + 1);
                     followerLogInformation.clearLeaderInstallSnapshotState();
 
-                    log.debug("{}: follower: {}, matchIndex set to {}, nextIndex set to {}",
-                        logName(), followerId, followerLogInformation.getMatchIndex(),
-                        followerLogInformation.getNextIndex());
+                    log.info("{}: Snapshot successfully installed on follower {} (last chunk {}) - "
+                        + "matchIndex set to {}, nextIndex set to {}", logName(), followerId, reply.getChunkIndex(),
+                        followerLogInformation.getMatchIndex(), followerLogInformation.getNextIndex());
 
                     if (!anyFollowersInstallingSnapshot()) {
                         // once there are no pending followers receiving snapshots
@@ -528,8 +526,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     installSnapshotState.markSendStatus(true);
                 }
             } else {
-                log.info("{}: InstallSnapshotReply received sending snapshot chunk failed, Will retry, Chunk: {}",
-                        logName(), reply.getChunkIndex());
+                log.warn("{}: Received failed InstallSnapshotReply - will retry: {}", logName(), reply);
 
                 installSnapshotState.markSendStatus(false);
             }
@@ -611,8 +608,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     /**
      * This method checks if any update needs to be sent to the given follower. This includes append log entries,
      * sending next snapshot chunk, and initiating a snapshot.
-     *
-     * @return true if any update is sent, false otherwise
      */
     private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation,
                                        boolean sendHeartbeat, boolean isHeartbeat) {
@@ -662,17 +657,22 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     // if the follower is just not starting and if leader's index is more than followers index
                     // then snapshot should be sent
 
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("%s: InitiateInstallSnapshot to follower: %s, "
-                                    + "follower-nextIndex: %d, leader-snapshot-index: %d,  "
-                                    + "leader-last-index: %d", logName(), followerId,
-                                    followerNextIndex, leaderSnapShotIndex, leaderLastIndex));
-                    }
-
                     // Send heartbeat to follower whenever install snapshot is initiated.
                     sendAppendEntries = true;
                     if (canInstallSnapshot(followerNextIndex)) {
+                        log.info("{}: Initiating install snapshot to follower {}: follower nextIndex: {}, leader "
+                                + "snapshotIndex: {}, leader lastIndex: {}, leader log size: {}", logName(), followerId,
+                                followerNextIndex, leaderSnapShotIndex, leaderLastIndex,
+                                context.getReplicatedLog().size());
+
                         initiateCaptureSnapshot(followerId);
+                    } else {
+                        // It doesn't seem like we should ever reach here - most likely indicates sonething is
+                        // wrong.
+                        log.info("{}: Follower {} is behind but cannot install snapshot: follower nextIndex: {}, "
+                                + "leader snapshotIndex: {}, leader lastIndex: {}, leader log size: {}", logName(),
+                                followerId, followerNextIndex, leaderSnapShotIndex, leaderLastIndex,
+                                context.getReplicatedLog().size());
                     }
 
                 } else if (sendHeartbeat) {
@@ -737,7 +737,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      */
     public boolean initiateCaptureSnapshot(String followerId) {
         FollowerLogInformation followerLogInfo = followerToLog.get(followerId);
-        if (snapshot.isPresent()) {
+        if (snapshotHolder.isPresent()) {
             // If a snapshot is present in the memory, most likely another install is in progress no need to capture
             // snapshot. This could happen if another follower needs an install when one is going on.
             final ActorSelection followerActor = context.getPeerActorSelection(followerId);
@@ -745,16 +745,16 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             // Note: sendSnapshotChunk will set the LeaderInstallSnapshotState.
             sendSnapshotChunk(followerActor, followerLogInfo);
             return true;
-        } else {
-            boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
-                    this.getReplicatedToAllIndex(), followerId);
-            if (captureInitiated) {
-                followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState(
-                        context.getConfigParams().getSnapshotChunkSize(), logName()));
-            }
+        }
 
-            return captureInitiated;
+        boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
+            this.getReplicatedToAllIndex(), followerId);
+        if (captureInitiated) {
+            followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState(
+                context.getConfigParams().getSnapshotChunkSize(), logName()));
         }
+
+        return captureInitiated;
     }
 
     private boolean canInstallSnapshot(long nextIndex) {
@@ -790,7 +790,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      *  InstallSnapshot should qualify as a heartbeat too.
      */
     private void sendSnapshotChunk(ActorSelection followerActor, FollowerLogInformation followerLogInfo) {
-        if (snapshot.isPresent()) {
+        if (snapshotHolder.isPresent()) {
             LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState();
             if (installSnapshotState == null) {
                 installSnapshotState = new LeaderInstallSnapshotState(context.getConfigParams().getSnapshotChunkSize(),
@@ -798,35 +798,39 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 followerLogInfo.setLeaderInstallSnapshotState(installSnapshotState);
             }
 
-            // Ensure the snapshot bytes are set - this is a no-op.
-            installSnapshotState.setSnapshotBytes(snapshot.get().getSnapshotBytes());
+            try {
+                // Ensure the snapshot bytes are set - this is a no-op.
+                installSnapshotState.setSnapshotBytes(snapshotHolder.get().getSnapshotBytes());
 
-            byte[] nextSnapshotChunk = installSnapshotState.getNextChunk();
+                byte[] nextSnapshotChunk = installSnapshotState.getNextChunk();
 
-            log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(),
-                    nextSnapshotChunk.length);
+                log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(),
+                        nextSnapshotChunk.length);
 
-            int nextChunkIndex = installSnapshotState.incrementChunkIndex();
-            Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
-            if (installSnapshotState.isLastChunk(nextChunkIndex)) {
-                serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
-            }
-
-            followerActor.tell(
-                new InstallSnapshot(currentTerm(), context.getId(),
-                    snapshot.get().getLastIncludedIndex(),
-                    snapshot.get().getLastIncludedTerm(),
-                    nextSnapshotChunk,
-                    nextChunkIndex,
-                    installSnapshotState.getTotalChunks(),
-                    Optional.of(installSnapshotState.getLastChunkHashCode()),
-                    serverConfig
-                ).toSerializable(followerLogInfo.getRaftVersion()),
-                actor()
-            );
+                int nextChunkIndex = installSnapshotState.incrementChunkIndex();
+                Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
+                if (installSnapshotState.isLastChunk(nextChunkIndex)) {
+                    serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
+                }
 
-            log.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", logName(), followerActor.path(),
-                    installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks());
+                followerActor.tell(
+                    new InstallSnapshot(currentTerm(), context.getId(),
+                        snapshotHolder.get().getLastIncludedIndex(),
+                        snapshotHolder.get().getLastIncludedTerm(),
+                        nextSnapshotChunk,
+                        nextChunkIndex,
+                        installSnapshotState.getTotalChunks(),
+                        Optional.of(installSnapshotState.getLastChunkHashCode()),
+                        serverConfig
+                    ).toSerializable(followerLogInfo.getRaftVersion()),
+                    actor()
+                );
+
+                log.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", logName(), followerActor.path(),
+                        installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks());
+            } catch (IOException e) {
+                throw Throwables.propagate(e);
+            }
         }
     }
 
@@ -918,15 +922,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         return followerToLog.size();
     }
 
-    private static class SnapshotHolder {
+    static class SnapshotHolder {
         private final long lastIncludedTerm;
         private final long lastIncludedIndex;
-        private final ByteString snapshotBytes;
+        private final ByteSource snapshotBytes;
 
-        SnapshotHolder(Snapshot snapshot) {
+        SnapshotHolder(Snapshot snapshot, ByteSource snapshotBytes) {
             this.lastIncludedTerm = snapshot.getLastAppliedTerm();
             this.lastIncludedIndex = snapshot.getLastAppliedIndex();
-            this.snapshotBytes = ByteString.copyFrom(snapshot.getState());
+            this.snapshotBytes = snapshotBytes;
         }
 
         long getLastIncludedTerm() {
@@ -937,7 +941,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             return lastIncludedIndex;
         }
 
-        ByteString getSnapshotBytes() {
+        ByteSource getSnapshotBytes() {
             return snapshotBytes;
         }
     }