Bug 6540: Fix journal issues on leader changes
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / AbstractLeader.java
index 5076a8a38f891c894b9e01fdc61db5c0d5fa11a9..180f492082ee05bde4477cd82add5a0c4d846f69 100644 (file)
@@ -15,7 +15,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
-import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -189,17 +188,14 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     }
 
     @Override
-    protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
-        AppendEntriesReply appendEntriesReply) {
-
+    protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) {
         if(LOG.isTraceEnabled()) {
             LOG.trace("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply);
         }
 
         // Update the FollowerLogInformation
         String followerId = appendEntriesReply.getFollowerId();
-        FollowerLogInformation followerLogInformation =
-            followerToLog.get(followerId);
+        FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
 
         if(followerLogInformation == null){
             LOG.error("{}: handleAppendEntriesReply - unknown follower {}", logName(), followerId);
@@ -218,6 +214,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         followerLogInformation.setPayloadVersion(appendEntriesReply.getPayloadVersion());
         followerLogInformation.setRaftVersion(appendEntriesReply.getRaftVersion());
 
+        long followerLastLogIndex = appendEntriesReply.getLogLastIndex();
+        long followersLastLogTermInLeadersLog = getLogEntryTerm(followerLastLogIndex);
         boolean updated = false;
         if(appendEntriesReply.getLogLastIndex() > context.getReplicatedLog().lastIndex()) {
             // The follower's log is actually ahead of the leader's log. Normally this doesn't happen
@@ -241,14 +239,32 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             followerLogInformation.setNextIndex(-1);
 
             initiateCaptureSnapshot(followerId);
+
             updated = true;
         } else if (appendEntriesReply.isSuccess()) {
-            updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
+            if (followerLastLogIndex >= 0 && followersLastLogTermInLeadersLog >= 0 &&
+                    followersLastLogTermInLeadersLog != appendEntriesReply.getLogLastTerm()) {
+                // The follower's last entry is present in the leader's journal but the terms don't match so the
+                // follower has a conflicting entry. Since the follower didn't report that it's out of sync, this means
+                // either the previous leader entry sent didn't conflict or the previous leader entry is in the snapshot
+                // and no longer in the journal. Either way, we set the follower's next index to 1 less than the last
+                // index reported by the follower. For the former case, the leader will send all entries starting with
+                // the previous follower's index and the follower will remove and replace the conflicting entries as
+                // needed. For the latter, the leader will initiate an install snapshot.
+
+                followerLogInformation.setNextIndex(followerLastLogIndex - 1);
+                updated = true;
+
+                LOG.debug("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the " +
+                          "leader's {} - set the follower's next index to {}",
+                        logName(), followerId, appendEntriesReply.getLogLastTerm(), appendEntriesReply.getLogLastIndex(),
+                        followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex());
+            } else {
+                updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
+            }
         } else {
             LOG.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply);
 
-            long followerLastLogIndex = appendEntriesReply.getLogLastIndex();
-            long followersLastLogTerm = getLogEntryTerm(followerLastLogIndex);
             if(appendEntriesReply.isForceInstallSnapshot()) {
                 // Reset the followers match and next index. This is to signal that this follower has nothing
                 // in common with this Leader and so would require a snapshot to be installed
@@ -257,8 +273,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
                 // Force initiate a snapshot capture
                 initiateCaptureSnapshot(followerId);
-            } else if(followerLastLogIndex < 0 || (followersLastLogTerm >= 0 &&
-                    followersLastLogTerm == appendEntriesReply.getLogLastTerm())) {
+            } else if(followerLastLogIndex < 0 || followersLastLogTermInLeadersLog >= 0 &&
+                    followersLastLogTermInLeadersLog == appendEntriesReply.getLogLastTerm()) {
                 // The follower's log is empty or the last entry is present in the leader's journal
                 // and the terms match so the follower is just behind the leader's journal from
                 // the last snapshot, if any. We'll catch up the follower quickly by starting at the
@@ -269,10 +285,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 // The follower's log conflicts with leader's log so decrement follower's next index by 1
                 // in an attempt to find where the logs match.
 
-                LOG.debug("{}: follower's last log term {} conflicts with the leader's {} - dec next index",
-                        logName(), appendEntriesReply.getLogLastTerm(), followersLastLogTerm);
-
                 followerLogInformation.decrNextIndex();
+                updated = true;
+
+                LOG.debug("{}: follower's last log term {} conflicts with the leader's {} - dec next index to {}",
+                        logName(), appendEntriesReply.getLogLastTerm(), followersLastLogTermInLeadersLog,
+                        followerLogInformation.getNextIndex());
             }
         }
 
@@ -507,7 +525,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             } else if (!wasLastChunk && installSnapshotState.canSendNextChunk()) {
                 ActorSelection followerActor = context.getPeerActorSelection(followerId);
                 if(followerActor != null) {
-                    sendSnapshotChunk(followerActor, followerId);
+                    sendSnapshotChunk(followerActor, followerLogInformation);
                 }
             }
 
@@ -595,7 +613,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             if (installSnapshotState != null) {
                 // if install snapshot is in process , then sent next chunk if possible
                 if (isFollowerActive && installSnapshotState.canSendNextChunk()) {
-                    sendSnapshotChunk(followerActor, followerId);
+                    sendSnapshotChunk(followerActor, followerLogInformation);
                 } else if(sendHeartbeat) {
                     // we send a heartbeat even if we have not received a reply for the last chunk
                     sendAppendEntries = true;
@@ -650,21 +668,32 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             }
 
             if(sendAppendEntries) {
-                sendAppendEntriesToFollower(followerActor, followerNextIndex,
-                        entries, followerId);
+                sendAppendEntriesToFollower(followerActor, entries, followerLogInformation);
             }
         }
     }
 
-    private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex,
-        List<ReplicatedLogEntry> entries, String followerId) {
+    private void sendAppendEntriesToFollower(ActorSelection followerActor, List<ReplicatedLogEntry> entries,
+            FollowerLogInformation followerLogInformation) {
+        // In certain cases outlined below we don't want to send the actual commit index to prevent the follower from
+        // possibly committing and applying conflicting entries (those with same index, different term) from a prior
+        // term that weren't replicated to a majority, which would be a violation of raft.
+        //     - if the follower isn't active. In this case we don't know the state of the follower and we send an
+        //       empty AppendEntries as a heart beat to prevent election.
+        //     - if we're in the process of installing a snapshot. In this case we don't send any new entries but still
+        //       need to send AppendEntries to prevent election.
+        boolean isInstallingSnaphot = followerLogInformation.getInstallSnapshotState() != null;
+        long leaderCommitIndex = isInstallingSnaphot || !followerLogInformation.isFollowerActive() ? -1 :
+            context.getCommitIndex();
+
+        long followerNextIndex = followerLogInformation.getNextIndex();
         AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
             getLogEntryIndex(followerNextIndex - 1),
             getLogEntryTerm(followerNextIndex - 1), entries,
-            context.getCommitIndex(), super.getReplicatedToAllIndex(), context.getPayloadVersion());
+            leaderCommitIndex, super.getReplicatedToAllIndex(), context.getPayloadVersion());
 
         if(!entries.isEmpty() || LOG.isTraceEnabled()) {
-            LOG.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerId,
+            LOG.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerLogInformation.getId(),
                     appendEntries);
         }
 
@@ -672,29 +701,42 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     }
 
     /**
+     * Initiates a snapshot capture to install on a follower.
+     *
      * Install Snapshot works as follows
-     * 1. Leader initiates the capture snapshot by sending a CaptureSnapshot message to actor
-     * 2. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
-     * and makes a call to Leader's handleMessage , with SendInstallSnapshot message.
-     * 3. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower
-     * 4. On complete, Follower sends back a InstallSnapshotReply.
-     * 5. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower
-     * and replenishes the memory by deleting the snapshot in Replicated log.
-     * 6. If another follower requires a snapshot and a snapshot has been collected (via CaptureSnapshotReply)
-     * then send the existing snapshot in chunks to the follower.
-     * @param followerId
+     *   1. Leader initiates the capture snapshot by calling createSnapshot on the RaftActor.
+     *   2. On receipt of the CaptureSnapshotReply message, the RaftActor persists the snapshot and makes a call to
+     *      the Leader's handleMessage with a SendInstallSnapshot message.
+     *   3. The Leader obtains and stores the Snapshot from the SendInstallSnapshot message and sends it in chunks to
+     *      the Follower via InstallSnapshot messages.
+     *   4. For each chunk, the Follower sends back an InstallSnapshotReply.
+     *   5. On receipt of the InstallSnapshotReply for the last chunk, the Leader marks the install complete for that
+     *      follower.
+     *   6. If another follower requires a snapshot and a snapshot has been collected (via SendInstallSnapshot)
+     *      then send the existing snapshot in chunks to the follower.
+     *
+     * @param followerId the id of the follower.
+     * @return true if capture was initiated, false otherwise.
      */
     public boolean initiateCaptureSnapshot(String followerId) {
+        FollowerLogInformation followerLogInfo = followerToLog.get(followerId);
         if (snapshot.isPresent()) {
-            // if a snapshot is present in the memory, most likely another install is in progress
-            // no need to capture snapshot.
-            // This could happen if another follower needs an install when one is going on.
+            // If a snapshot is present in the memory, most likely another install is in progress no need to capture
+            // snapshot. This could happen if another follower needs an install when one is going on.
             final ActorSelection followerActor = context.getPeerActorSelection(followerId);
-            sendSnapshotChunk(followerActor, followerId);
+
+            // Note: sendSnapshotChunk will set the LeaderInstallSnapshotState.
+            sendSnapshotChunk(followerActor, followerLogInfo);
             return true;
         } else {
-            return context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
+            boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
                     this.getReplicatedToAllIndex(), followerId);
+            if(captureInitiated) {
+                followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState(
+                        context.getConfigParams().getSnapshotChunkSize(), logName()));
+            }
+
+            return captureInitiated;
         }
     }
 
@@ -718,9 +760,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
             if (followerActor != null) {
                 long nextIndex = followerLogInfo.getNextIndex();
-                if (context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED ||
+                if (followerLogInfo.getInstallSnapshotState() != null ||
+                        context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED ||
                         canInstallSnapshot(nextIndex)) {
-                    sendSnapshotChunk(followerActor, followerId);
+                    sendSnapshotChunk(followerActor, followerLogInfo);
                 }
             }
         }
@@ -730,61 +773,48 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      *  Sends a snapshot chunk to a given follower
      *  InstallSnapshot should qualify as a heartbeat too.
      */
-    private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
-        try {
-            if (snapshot.isPresent()) {
-                byte[] nextSnapshotChunk = getNextSnapshotChunk(followerId, snapshot.get().getSnapshotBytes());
-
-                // Note: the previous call to getNextSnapshotChunk has the side-effect of adding
-                // followerId to the followerToSnapshot map.
-                LeaderInstallSnapshotState installSnapshotState = followerToLog.get(followerId).getInstallSnapshotState();
-
-                int nextChunkIndex = installSnapshotState.incrementChunkIndex();
-                Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
-                if(installSnapshotState.isLastChunk(nextChunkIndex)) {
-                    serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
-                }
-
-                followerActor.tell(
-                    new InstallSnapshot(currentTerm(), context.getId(),
-                        snapshot.get().getLastIncludedIndex(),
-                        snapshot.get().getLastIncludedTerm(),
-                        nextSnapshotChunk,
-                        nextChunkIndex,
-                        installSnapshotState.getTotalChunks(),
-                        Optional.of(installSnapshotState.getLastChunkHashCode()),
-                        serverConfig
-                    ).toSerializable(followerToLog.get(followerId).getRaftVersion()),
-                    actor()
-                );
-
-                if(LOG.isDebugEnabled()) {
-                    LOG.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
-                            logName(), followerActor.path(), installSnapshotState.getChunkIndex(),
-                            installSnapshotState.getTotalChunks());
-                }
+    private void sendSnapshotChunk(ActorSelection followerActor, FollowerLogInformation followerLogInfo) {
+        if (snapshot.isPresent()) {
+            LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState();
+            if (installSnapshotState == null) {
+                installSnapshotState = new LeaderInstallSnapshotState(context.getConfigParams().getSnapshotChunkSize(),
+                        logName());
+                followerLogInfo.setLeaderInstallSnapshotState(installSnapshotState);
             }
-        } catch (IOException e) {
-            LOG.error("{}: InstallSnapshot failed for Leader.", logName(), e);
-        }
-    }
 
-    /**
-     * Acccepts snaphot as ByteString, enters into map for future chunks
-     * creates and return a ByteString chunk
-     */
-    private byte[] getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
-        LeaderInstallSnapshotState installSnapshotState = followerToLog.get(followerId).getInstallSnapshotState();
-        if (installSnapshotState == null) {
-            installSnapshotState = new LeaderInstallSnapshotState(snapshotBytes, context.getConfigParams().getSnapshotChunkSize(),
-                    logName());
-            followerToLog.get(followerId).setLeaderInstallSnapshotState(installSnapshotState);
-        }
-        byte[] nextChunk = installSnapshotState.getNextChunk();
+            // Ensure the snapshot bytes are set - this is a no-op.
+            installSnapshotState.setSnapshotBytes(snapshot.get().getSnapshotBytes());
+
+            byte[] nextSnapshotChunk = installSnapshotState.getNextChunk();
+
+            LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(),
+                    nextSnapshotChunk.length);
+
+            int nextChunkIndex = installSnapshotState.incrementChunkIndex();
+            Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
+            if(installSnapshotState.isLastChunk(nextChunkIndex)) {
+                serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
+            }
 
-        LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.length);
+            followerActor.tell(
+                new InstallSnapshot(currentTerm(), context.getId(),
+                    snapshot.get().getLastIncludedIndex(),
+                    snapshot.get().getLastIncludedTerm(),
+                    nextSnapshotChunk,
+                    nextChunkIndex,
+                    installSnapshotState.getTotalChunks(),
+                    Optional.of(installSnapshotState.getLastChunkHashCode()),
+                    serverConfig
+                ).toSerializable(followerLogInfo.getRaftVersion()),
+                actor()
+            );
 
-        return nextChunk;
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
+                        logName(), followerActor.path(), installSnapshotState.getChunkIndex(),
+                        installSnapshotState.getTotalChunks());
+            }
+        }
     }
 
     private void sendHeartBeat() {