Bug 6540: Fix journal issues on leader changes 15/45515/4
authorTom Pantelis <tpanteli@brocade.com>
Fri, 9 Sep 2016 18:08:03 +0000 (14:08 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Mon, 26 Sep 2016 03:43:04 +0000 (23:43 -0400)
Fixed a couple issues with journal syncing on leader changes and isolation.

Consider the scenario where a leader is isolated and the majority partition elects
a new leader and both sides of the partition attempt to commit entries independently.
Say the term was 1 and last journal index was 2 prior to isolation and was replicated
to all followers and applied to state. After isolation, the isolated leader appends a
new entry with index 3 and attempts to replicate but fails to reach consensus.
Meanwhile, the new leader appends its own new entry with index 3 and is successfully
replicated to the remaining follower and applied to state. The commitIndex in the
majority partition is now 3. The new leader attempts to send AppendEntries to the
isolated leader but doesn't get any replies so it marks it as inactive.

When the partition is healed, the isolated leader converts to follower when it hears
from the new leader with the higher term. Since the new leader has marked the isolated
leader as inactive, the initial AppendEntries that the previous leader sees will have
no entries and the leaderCommitIndex will be 3. This is greater than the current
commitIndex 2 so the previous leader will update its commitIndex to 3 and apply its
entry with index 3 to the state. However this entry was from the previous term 1 which
was not replicated to a majority of the nodes and conflicts with the new leader's entry
with index 3 and term 2. This is a violation of raft.

This violation occurs as a result of the new leader not sending any entries until it
knows the follower is active. This is for efficiency to avoid continuously trying to
send entries when a follower is down. This is fine however the leader should not send
its current commit index either since it doesn't know the state of the follower. The
intention of the empty AppendEntries in this case is to re-establish connectivity with
the follower and thus should not cause any state change in the follower. Therefore I
changed the code to send leaderCommitIndex as -1 if the follower is inactive.

The other case where the leader purposely sends an empty AppendEntries is when the
leader is in the process of installing a snapshot on a follower, as indicated by the
presence of a LeaderInstallSnapshotState instance in the FollowerLogInformation. The
empty AppendEntries is still sent at the heartbeat interval to prevent an election
timeout in case the snapshot capture/transfer is delayed. Again, the AppendEntries
should not cause any state change in the follower so I also changed the leader to send
-1 for the leaderCommitIndex. As a result, I also changed it so that the leader
immeditely records a LeaderInstallSnapshotState instance in the FollowerLogInformation
when it initiates the async snapshot capture. Previously this was done when the capture
completed and the RaftActor sent the SendInstallSnapshot message to the leader
behavior. However it may take some time to capture the snapshot and intervening AppendEntries heart beats may be sent to the follower.

The other issue in the above scenario is that the conflict with entry 3 is not
immediately detected. On the first AppendEntries, the previous leader reports back
a successful reply with lastLogIndex 3 and lastLogTerm 1 b/c the previous
index (2) and term (1) didn't conflict. The new leader sets the previous leader's
match index to 3 and thinks index 3 has been replicated to all the followers and
trims its in-memory log at index 2. Eventually when the next entry with index 4 is
replicated, the previous leader will detect the conflict as the leader's previous
log index 3 and term 2 will be sent in the next AppendEntries. The new leader will
backtrack and eventually install a snapshot to sync the previous leader however
it's inefficient and should be unnecessary. The leader should detect the conflict
immediately on the first AppendEntries reply. So I changed handleAppendEntriesReply
to check that the follower's lastLogTerm matches the leader's term for that index.
If not, the leader sets the follower's next index to lastLogTerm - 1. This prevents
the leader from trimming its log and the next AppendEntries will include the
conflicting entry which the follower will remove/replace.

Change-Id: I7a0282cc4078f33ffd049e4a0eb4feff6230510d
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
12 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/ApplyState.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/IsolationScenarioTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/LeadershipTransferIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/CandidateTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java

index a8a33c3..9d58288 100644 (file)
@@ -182,7 +182,9 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
 
     @Override
     public void setLeaderInstallSnapshotState(@Nonnull LeaderInstallSnapshotState state) {
-        this.installSnapshotState = Preconditions.checkNotNull(state);
+        if(this.installSnapshotState == null) {
+            this.installSnapshotState = Preconditions.checkNotNull(state);
+        }
     }
 
     @Override
index 7463a93..1f92d5f 100644 (file)
@@ -39,9 +39,6 @@ public class ApplyState implements Serializable {
 
     @Override
     public String toString() {
-        return "ApplyState{" +
-                "identifier='" + identifier + '\'' +
-                ", replicatedLogEntry.index =" + replicatedLogEntry.getIndex() +
-                '}';
+        return "ApplyState [identifier=" + identifier + ", replicatedLogEntry=" + replicatedLogEntry + "]";
     }
 }
index 5076a8a..180f492 100644 (file)
@@ -15,7 +15,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
-import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -189,17 +188,14 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     }
 
     @Override
-    protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender,
-        AppendEntriesReply appendEntriesReply) {
-
+    protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) {
         if(LOG.isTraceEnabled()) {
             LOG.trace("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply);
         }
 
         // Update the FollowerLogInformation
         String followerId = appendEntriesReply.getFollowerId();
-        FollowerLogInformation followerLogInformation =
-            followerToLog.get(followerId);
+        FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
 
         if(followerLogInformation == null){
             LOG.error("{}: handleAppendEntriesReply - unknown follower {}", logName(), followerId);
@@ -218,6 +214,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         followerLogInformation.setPayloadVersion(appendEntriesReply.getPayloadVersion());
         followerLogInformation.setRaftVersion(appendEntriesReply.getRaftVersion());
 
+        long followerLastLogIndex = appendEntriesReply.getLogLastIndex();
+        long followersLastLogTermInLeadersLog = getLogEntryTerm(followerLastLogIndex);
         boolean updated = false;
         if(appendEntriesReply.getLogLastIndex() > context.getReplicatedLog().lastIndex()) {
             // The follower's log is actually ahead of the leader's log. Normally this doesn't happen
@@ -241,14 +239,32 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             followerLogInformation.setNextIndex(-1);
 
             initiateCaptureSnapshot(followerId);
+
             updated = true;
         } else if (appendEntriesReply.isSuccess()) {
-            updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
+            if (followerLastLogIndex >= 0 && followersLastLogTermInLeadersLog >= 0 &&
+                    followersLastLogTermInLeadersLog != appendEntriesReply.getLogLastTerm()) {
+                // The follower's last entry is present in the leader's journal but the terms don't match so the
+                // follower has a conflicting entry. Since the follower didn't report that it's out of sync, this means
+                // either the previous leader entry sent didn't conflict or the previous leader entry is in the snapshot
+                // and no longer in the journal. Either way, we set the follower's next index to 1 less than the last
+                // index reported by the follower. For the former case, the leader will send all entries starting with
+                // the previous follower's index and the follower will remove and replace the conflicting entries as
+                // needed. For the latter, the leader will initiate an install snapshot.
+
+                followerLogInformation.setNextIndex(followerLastLogIndex - 1);
+                updated = true;
+
+                LOG.debug("{}: handleAppendEntriesReply: follower {} last log term {} for index {} conflicts with the " +
+                          "leader's {} - set the follower's next index to {}",
+                        logName(), followerId, appendEntriesReply.getLogLastTerm(), appendEntriesReply.getLogLastIndex(),
+                        followersLastLogTermInLeadersLog, followerLogInformation.getNextIndex());
+            } else {
+                updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
+            }
         } else {
             LOG.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply);
 
-            long followerLastLogIndex = appendEntriesReply.getLogLastIndex();
-            long followersLastLogTerm = getLogEntryTerm(followerLastLogIndex);
             if(appendEntriesReply.isForceInstallSnapshot()) {
                 // Reset the followers match and next index. This is to signal that this follower has nothing
                 // in common with this Leader and so would require a snapshot to be installed
@@ -257,8 +273,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
                 // Force initiate a snapshot capture
                 initiateCaptureSnapshot(followerId);
-            } else if(followerLastLogIndex < 0 || (followersLastLogTerm >= 0 &&
-                    followersLastLogTerm == appendEntriesReply.getLogLastTerm())) {
+            } else if(followerLastLogIndex < 0 || followersLastLogTermInLeadersLog >= 0 &&
+                    followersLastLogTermInLeadersLog == appendEntriesReply.getLogLastTerm()) {
                 // The follower's log is empty or the last entry is present in the leader's journal
                 // and the terms match so the follower is just behind the leader's journal from
                 // the last snapshot, if any. We'll catch up the follower quickly by starting at the
@@ -269,10 +285,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 // The follower's log conflicts with leader's log so decrement follower's next index by 1
                 // in an attempt to find where the logs match.
 
-                LOG.debug("{}: follower's last log term {} conflicts with the leader's {} - dec next index",
-                        logName(), appendEntriesReply.getLogLastTerm(), followersLastLogTerm);
-
                 followerLogInformation.decrNextIndex();
+                updated = true;
+
+                LOG.debug("{}: follower's last log term {} conflicts with the leader's {} - dec next index to {}",
+                        logName(), appendEntriesReply.getLogLastTerm(), followersLastLogTermInLeadersLog,
+                        followerLogInformation.getNextIndex());
             }
         }
 
@@ -507,7 +525,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             } else if (!wasLastChunk && installSnapshotState.canSendNextChunk()) {
                 ActorSelection followerActor = context.getPeerActorSelection(followerId);
                 if(followerActor != null) {
-                    sendSnapshotChunk(followerActor, followerId);
+                    sendSnapshotChunk(followerActor, followerLogInformation);
                 }
             }
 
@@ -595,7 +613,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             if (installSnapshotState != null) {
                 // if install snapshot is in process , then sent next chunk if possible
                 if (isFollowerActive && installSnapshotState.canSendNextChunk()) {
-                    sendSnapshotChunk(followerActor, followerId);
+                    sendSnapshotChunk(followerActor, followerLogInformation);
                 } else if(sendHeartbeat) {
                     // we send a heartbeat even if we have not received a reply for the last chunk
                     sendAppendEntries = true;
@@ -650,21 +668,32 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             }
 
             if(sendAppendEntries) {
-                sendAppendEntriesToFollower(followerActor, followerNextIndex,
-                        entries, followerId);
+                sendAppendEntriesToFollower(followerActor, entries, followerLogInformation);
             }
         }
     }
 
-    private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex,
-        List<ReplicatedLogEntry> entries, String followerId) {
+    private void sendAppendEntriesToFollower(ActorSelection followerActor, List<ReplicatedLogEntry> entries,
+            FollowerLogInformation followerLogInformation) {
+        // In certain cases outlined below we don't want to send the actual commit index to prevent the follower from
+        // possibly committing and applying conflicting entries (those with same index, different term) from a prior
+        // term that weren't replicated to a majority, which would be a violation of raft.
+        //     - if the follower isn't active. In this case we don't know the state of the follower and we send an
+        //       empty AppendEntries as a heart beat to prevent election.
+        //     - if we're in the process of installing a snapshot. In this case we don't send any new entries but still
+        //       need to send AppendEntries to prevent election.
+        boolean isInstallingSnaphot = followerLogInformation.getInstallSnapshotState() != null;
+        long leaderCommitIndex = isInstallingSnaphot || !followerLogInformation.isFollowerActive() ? -1 :
+            context.getCommitIndex();
+
+        long followerNextIndex = followerLogInformation.getNextIndex();
         AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
             getLogEntryIndex(followerNextIndex - 1),
             getLogEntryTerm(followerNextIndex - 1), entries,
-            context.getCommitIndex(), super.getReplicatedToAllIndex(), context.getPayloadVersion());
+            leaderCommitIndex, super.getReplicatedToAllIndex(), context.getPayloadVersion());
 
         if(!entries.isEmpty() || LOG.isTraceEnabled()) {
-            LOG.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerId,
+            LOG.debug("{}: Sending AppendEntries to follower {}: {}", logName(), followerLogInformation.getId(),
                     appendEntries);
         }
 
@@ -672,29 +701,42 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     }
 
     /**
+     * Initiates a snapshot capture to install on a follower.
+     *
      * Install Snapshot works as follows
-     * 1. Leader initiates the capture snapshot by sending a CaptureSnapshot message to actor
-     * 2. RaftActor on receipt of the CaptureSnapshotReply (from Shard), stores the received snapshot in the replicated log
-     * and makes a call to Leader's handleMessage , with SendInstallSnapshot message.
-     * 3. Leader , picks the snapshot from im-mem ReplicatedLog and sends it in chunks to the Follower
-     * 4. On complete, Follower sends back a InstallSnapshotReply.
-     * 5. On receipt of the InstallSnapshotReply for the last chunk, Leader marks the install complete for that follower
-     * and replenishes the memory by deleting the snapshot in Replicated log.
-     * 6. If another follower requires a snapshot and a snapshot has been collected (via CaptureSnapshotReply)
-     * then send the existing snapshot in chunks to the follower.
-     * @param followerId
+     *   1. Leader initiates the capture snapshot by calling createSnapshot on the RaftActor.
+     *   2. On receipt of the CaptureSnapshotReply message, the RaftActor persists the snapshot and makes a call to
+     *      the Leader's handleMessage with a SendInstallSnapshot message.
+     *   3. The Leader obtains and stores the Snapshot from the SendInstallSnapshot message and sends it in chunks to
+     *      the Follower via InstallSnapshot messages.
+     *   4. For each chunk, the Follower sends back an InstallSnapshotReply.
+     *   5. On receipt of the InstallSnapshotReply for the last chunk, the Leader marks the install complete for that
+     *      follower.
+     *   6. If another follower requires a snapshot and a snapshot has been collected (via SendInstallSnapshot)
+     *      then send the existing snapshot in chunks to the follower.
+     *
+     * @param followerId the id of the follower.
+     * @return true if capture was initiated, false otherwise.
      */
     public boolean initiateCaptureSnapshot(String followerId) {
+        FollowerLogInformation followerLogInfo = followerToLog.get(followerId);
         if (snapshot.isPresent()) {
-            // if a snapshot is present in the memory, most likely another install is in progress
-            // no need to capture snapshot.
-            // This could happen if another follower needs an install when one is going on.
+            // If a snapshot is present in the memory, most likely another install is in progress no need to capture
+            // snapshot. This could happen if another follower needs an install when one is going on.
             final ActorSelection followerActor = context.getPeerActorSelection(followerId);
-            sendSnapshotChunk(followerActor, followerId);
+
+            // Note: sendSnapshotChunk will set the LeaderInstallSnapshotState.
+            sendSnapshotChunk(followerActor, followerLogInfo);
             return true;
         } else {
-            return context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
+            boolean captureInitiated = context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
                     this.getReplicatedToAllIndex(), followerId);
+            if(captureInitiated) {
+                followerLogInfo.setLeaderInstallSnapshotState(new LeaderInstallSnapshotState(
+                        context.getConfigParams().getSnapshotChunkSize(), logName()));
+            }
+
+            return captureInitiated;
         }
     }
 
@@ -718,9 +760,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
             if (followerActor != null) {
                 long nextIndex = followerLogInfo.getNextIndex();
-                if (context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED ||
+                if (followerLogInfo.getInstallSnapshotState() != null ||
+                        context.getPeerInfo(followerId).getVotingState() == VotingState.VOTING_NOT_INITIALIZED ||
                         canInstallSnapshot(nextIndex)) {
-                    sendSnapshotChunk(followerActor, followerId);
+                    sendSnapshotChunk(followerActor, followerLogInfo);
                 }
             }
         }
@@ -730,61 +773,48 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      *  Sends a snapshot chunk to a given follower
      *  InstallSnapshot should qualify as a heartbeat too.
      */
-    private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
-        try {
-            if (snapshot.isPresent()) {
-                byte[] nextSnapshotChunk = getNextSnapshotChunk(followerId, snapshot.get().getSnapshotBytes());
-
-                // Note: the previous call to getNextSnapshotChunk has the side-effect of adding
-                // followerId to the followerToSnapshot map.
-                LeaderInstallSnapshotState installSnapshotState = followerToLog.get(followerId).getInstallSnapshotState();
-
-                int nextChunkIndex = installSnapshotState.incrementChunkIndex();
-                Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
-                if(installSnapshotState.isLastChunk(nextChunkIndex)) {
-                    serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
-                }
-
-                followerActor.tell(
-                    new InstallSnapshot(currentTerm(), context.getId(),
-                        snapshot.get().getLastIncludedIndex(),
-                        snapshot.get().getLastIncludedTerm(),
-                        nextSnapshotChunk,
-                        nextChunkIndex,
-                        installSnapshotState.getTotalChunks(),
-                        Optional.of(installSnapshotState.getLastChunkHashCode()),
-                        serverConfig
-                    ).toSerializable(followerToLog.get(followerId).getRaftVersion()),
-                    actor()
-                );
-
-                if(LOG.isDebugEnabled()) {
-                    LOG.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
-                            logName(), followerActor.path(), installSnapshotState.getChunkIndex(),
-                            installSnapshotState.getTotalChunks());
-                }
+    private void sendSnapshotChunk(ActorSelection followerActor, FollowerLogInformation followerLogInfo) {
+        if (snapshot.isPresent()) {
+            LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState();
+            if (installSnapshotState == null) {
+                installSnapshotState = new LeaderInstallSnapshotState(context.getConfigParams().getSnapshotChunkSize(),
+                        logName());
+                followerLogInfo.setLeaderInstallSnapshotState(installSnapshotState);
             }
-        } catch (IOException e) {
-            LOG.error("{}: InstallSnapshot failed for Leader.", logName(), e);
-        }
-    }
 
-    /**
-     * Acccepts snaphot as ByteString, enters into map for future chunks
-     * creates and return a ByteString chunk
-     */
-    private byte[] getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException {
-        LeaderInstallSnapshotState installSnapshotState = followerToLog.get(followerId).getInstallSnapshotState();
-        if (installSnapshotState == null) {
-            installSnapshotState = new LeaderInstallSnapshotState(snapshotBytes, context.getConfigParams().getSnapshotChunkSize(),
-                    logName());
-            followerToLog.get(followerId).setLeaderInstallSnapshotState(installSnapshotState);
-        }
-        byte[] nextChunk = installSnapshotState.getNextChunk();
+            // Ensure the snapshot bytes are set - this is a no-op.
+            installSnapshotState.setSnapshotBytes(snapshot.get().getSnapshotBytes());
+
+            byte[] nextSnapshotChunk = installSnapshotState.getNextChunk();
+
+            LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(),
+                    nextSnapshotChunk.length);
+
+            int nextChunkIndex = installSnapshotState.incrementChunkIndex();
+            Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
+            if(installSnapshotState.isLastChunk(nextChunkIndex)) {
+                serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
+            }
 
-        LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.length);
+            followerActor.tell(
+                new InstallSnapshot(currentTerm(), context.getId(),
+                    snapshot.get().getLastIncludedIndex(),
+                    snapshot.get().getLastIncludedTerm(),
+                    nextSnapshotChunk,
+                    nextChunkIndex,
+                    installSnapshotState.getTotalChunks(),
+                    Optional.of(installSnapshotState.getLastChunkHashCode()),
+                    serverConfig
+                ).toSerializable(followerLogInfo.getRaftVersion()),
+                actor()
+            );
 
-        return nextChunk;
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
+                        logName(), followerActor.path(), installSnapshotState.getChunkIndex(),
+                        installSnapshotState.getTotalChunks());
+            }
+        }
     }
 
     private void sendHeartBeat() {
index 1a72133..5c9c0b7 100644 (file)
@@ -261,7 +261,9 @@ public class Follower extends AbstractRaftActorBehavior {
         lastIndex = lastIndex();
         long prevCommitIndex = context.getCommitIndex();
 
-        context.setCommitIndex(Math.min(appendEntries.getLeaderCommit(), lastIndex));
+        if(appendEntries.getLeaderCommit() > prevCommitIndex) {
+            context.setCommitIndex(Math.min(appendEntries.getLeaderCommit(), lastIndex));
+        }
 
         if (prevCommitIndex != context.getCommitIndex()) {
             LOG.debug("{}: Commit index set to {}", logName(), context.getCommitIndex());
@@ -329,9 +331,9 @@ public class Follower extends AbstractRaftActorBehavior {
             // prevLogIndex entry does exist in the follower's log but it has
             // a different term in it
 
-            LOG.debug(
-                    "{}: Cannot append entries because previous entry term {}  is not equal to append entries prevLogTerm {}",
-                    logName(), prevLogTerm, appendEntries.getPrevLogTerm());
+            LOG.debug("{}: The prevLogIndex {} was found in the log but the term {} is not equal to the append entries " +
+                      "prevLogTerm {} - lastIndex: {}, snapshotIndex: {}", logName(), appendEntries.getPrevLogIndex(),
+                      prevLogTerm, appendEntries.getPrevLogTerm(), lastIndex, context.getReplicatedLog().getSnapshotIndex());
         } else if(appendEntries.getPrevLogIndex() == -1 && appendEntries.getPrevLogTerm() == -1
                 && appendEntries.getReplicatedToAllIndex() != -1
                 && !isLogEntryPresent(appendEntries.getReplicatedToAllIndex())) {
index 81c3eff..8000217 100644 (file)
@@ -27,26 +27,36 @@ public final class LeaderInstallSnapshotState {
     // This would be passed as the hash code of the last chunk when sending the first chunk
     static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
 
-    private int snapshotChunkSize;
-    private final ByteString snapshotBytes;
+    private final int snapshotChunkSize;
     private final String logName;
+    private ByteString snapshotBytes;
     private int offset = 0;
     // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
-    private int replyReceivedForOffset;
+    private int replyReceivedForOffset = -1;
     // if replyStatus is false, the previous chunk is attempted
     private boolean replyStatus = false;
-    private int chunkIndex;
-    private final int totalChunks;
+    private int chunkIndex = FIRST_CHUNK_INDEX;
+    private int totalChunks;
     private int lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
     private int nextChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
 
-    LeaderInstallSnapshotState(ByteString snapshotBytes, int snapshotChunkSize, String logName) {
+    LeaderInstallSnapshotState(int snapshotChunkSize, String logName) {
         this.snapshotChunkSize = snapshotChunkSize;
-        this.snapshotBytes = snapshotBytes;
         this.logName = logName;
+    }
+
+    ByteString getSnapshotBytes() {
+        return snapshotBytes;
+    }
+
+    void setSnapshotBytes(ByteString snapshotBytes) {
+        if(this.snapshotBytes != null) {
+            return;
+        }
+
+        this.snapshotBytes = snapshotBytes;
         int size = snapshotBytes.size();
-        totalChunks = size / snapshotChunkSize +
-                (size % snapshotChunkSize > 0 ? 1 : 0);
+        totalChunks = (size / snapshotChunkSize) + (size % snapshotChunkSize > 0 ? 1 : 0);
 
         LOG.debug("{}: Snapshot {} bytes, total chunks to send: {}", logName, size, totalChunks);
 
@@ -54,10 +64,6 @@ public final class LeaderInstallSnapshotState {
         chunkIndex = FIRST_CHUNK_INDEX;
     }
 
-    ByteString getSnapshotBytes() {
-        return snapshotBytes;
-    }
-
     int incrementOffset() {
         if(replyStatus) {
             // if prev chunk failed, we would want to sent the same chunk again
@@ -84,7 +90,7 @@ public final class LeaderInstallSnapshotState {
 
     boolean canSendNextChunk() {
         // we only send a false if a chunk is sent but we have not received a reply yet
-        return replyReceivedForOffset == offset;
+        return snapshotBytes != null && replyReceivedForOffset == offset;
     }
 
     boolean isLastChunk(int index) {
@@ -110,7 +116,7 @@ public final class LeaderInstallSnapshotState {
         int size = snapshotChunkSize;
         if (snapshotChunkSize > snapshotLength) {
             size = snapshotLength;
-        } else if (start + snapshotChunkSize > snapshotLength) {
+        } else if ((start + snapshotChunkSize) > snapshotLength) {
             size = snapshotLength - start;
         }
 
index 5d3b2aa..6e2f25b 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
+import static akka.pattern.Patterns.ask;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import akka.actor.ActorRef;
@@ -16,6 +17,8 @@ import akka.actor.Terminated;
 import akka.dispatch.Dispatchers;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
+import akka.util.Timeout;
+import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.ArrayList;
@@ -24,12 +27,16 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
 import org.junit.After;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
@@ -40,6 +47,7 @@ import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.util.AbstractStringIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -78,7 +86,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
     public static class TestRaftActor extends MockRaftActor {
 
         private final TestActorRef<MessageCollectorActor> collectorActor;
-        private final Map<Class<?>, Boolean> dropMessages = new ConcurrentHashMap<>();
+        private final Map<Class<?>, Predicate<?>> dropMessages = new ConcurrentHashMap<>();
 
         private TestRaftActor(Builder builder) {
             super(builder);
@@ -86,7 +94,11 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         }
 
         public void startDropMessages(Class<?> msgClass) {
-            dropMessages.put(msgClass, Boolean.TRUE);
+            dropMessages.put(msgClass, msg -> true);
+        }
+
+        <T> void startDropMessages(Class<T> msgClass, Predicate<T> filter) {
+            dropMessages.put(msgClass, filter);
         }
 
         public void stopDropMessages(Class<?> msgClass) {
@@ -97,6 +109,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
             getRaftActorContext().setTotalMemoryRetriever(mockTotalMemory > 0 ? () -> mockTotalMemory : null);
         }
 
+        @SuppressWarnings({ "rawtypes", "unchecked" })
         @Override
         public void handleCommand(Object message) {
             if(message instanceof MockPayload) {
@@ -117,7 +130,8 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
             }
 
             try {
-                if(!dropMessages.containsKey(message.getClass())) {
+                Predicate drop = dropMessages.get(message.getClass());
+                if(drop == null || !drop.test(message)) {
                     super.handleCommand(message);
                 }
             } finally {
@@ -206,7 +220,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
     protected DefaultConfigParamsImpl newLeaderConfigParams() {
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
-        configParams.setElectionTimeoutFactor(1);
+        configParams.setElectionTimeoutFactor(4);
         configParams.setSnapshotBatchCount(snapshotBatchCount);
         configParams.setSnapshotDataThresholdPercentage(70);
         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
@@ -355,4 +369,26 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         assertEquals(name + " replicatedToAllIndex", replicatedToAllIndex,
                 actor.getCurrentBehavior().getReplicatedToAllIndex());
     }
+
+    static void verifyRaftState(ActorRef raftActor, Consumer<OnDemandRaftState> verifier) {
+        Timeout timeout = new Timeout(500, TimeUnit.MILLISECONDS);
+        AssertionError lastError = null;
+        Stopwatch sw = Stopwatch.createStarted();
+        while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
+            try {
+                OnDemandRaftState raftState = (OnDemandRaftState)Await.result(ask(raftActor,
+                        GetOnDemandRaftState.INSTANCE, timeout), timeout.duration());
+                verifier.accept(raftState);
+                return;
+            } catch (AssertionError e) {
+                lastError = e;
+                Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+            } catch (Exception e) {
+                lastError = new AssertionError("OnDemandRaftState failed", e);
+                Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+            }
+        }
+
+        throw lastError;
+    }
 }
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/IsolationScenarioTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/IsolationScenarioTest.java
new file mode 100644 (file)
index 0000000..22dff6a
--- /dev/null
@@ -0,0 +1,473 @@
+/*
+ * Copyright (c) 2016 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.getAllMatching;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.assertNoneMatching;
+
+import akka.actor.Actor;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.testkit.TestActorRef;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.notifications.RoleChanged;
+import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Tests isolation of nodes end-to-end.
+ *
+ * @author Thomas Pantelis
+ */
+public class IsolationScenarioTest extends AbstractRaftActorIntegrationTest {
+    private TestActorRef<Actor> follower1NotifierActor;
+    private TestActorRef<Actor> leaderNotifierActor;
+
+    /**
+     * Isolates the leader after all initial payload entries have been committed and applied on all nodes. While
+     * isolated, the majority partition elects a new leader and both sides of the partition attempt to commit one entry
+     * independently. After isolation is removed, the entry will conflict and both sides should reconcile their logs
+     * appropriately.
+     */
+    @Test
+    public void testLeaderIsolationWithAllPriorEntriesCommitted() throws Exception {
+        testLog.info("testLeaderIsolationWithAllPriorEntriesCommitted starting");
+
+        createRaftActors();
+
+        // Send an initial payloads and verify replication.
+
+        MockPayload payload0 = sendPayloadData(leaderActor, "zero");
+        MockPayload payload1 = sendPayloadData(leaderActor, "one");
+        verifyApplyJournalEntries(leaderCollectorActor, 1);
+        verifyApplyJournalEntries(follower1CollectorActor, 1);
+        verifyApplyJournalEntries(follower2CollectorActor, 1);
+
+        isolateLeader();
+
+        // Send a payload to the isolated leader so it has an uncommitted log entry with index 2.
+
+        testLog.info("Sending payload to isolated leader");
+
+        MockPayload isolatedLeaderPayload2 = sendPayloadData(leaderActor, "two");
+
+        // Wait for the isolated leader to send AppendEntries to follower1 with the entry at index 2. Note the message
+        // is collected but not forwarded to the follower RaftActor.
+
+        AppendEntries appendEntries = expectFirstMatching(follower1CollectorActor, AppendEntries.class);
+        assertEquals("getTerm", currentTerm, appendEntries.getTerm());
+        assertEquals("getLeaderId", leaderId, appendEntries.getLeaderId());
+        assertEquals("getEntries().size()", 1, appendEntries.getEntries().size());
+        verifyReplicatedLogEntry(appendEntries.getEntries().get(0), currentTerm, 2, isolatedLeaderPayload2);
+
+        // The leader should transition to IsolatedLeader.
+
+        expectFirstMatching(leaderNotifierActor, RoleChanged.class,
+                rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
+
+        forceElectionOnFollower1();
+
+        // Send a payload to the new leader follower1 with index 2 and verify it's replicated to follower2 and committed.
+
+        testLog.info("Sending payload to new leader");
+
+        MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
+        verifyApplyJournalEntries(follower1CollectorActor, 2);
+        verifyApplyJournalEntries(follower2CollectorActor, 2);
+
+        assertEquals("Follower 1 journal last term", currentTerm, follower1Context.getReplicatedLog().lastTerm());
+        assertEquals("Follower 1 journal last index", 2, follower1Context.getReplicatedLog().lastIndex());
+        assertEquals("Follower 1 commit index", 2, follower1Context.getCommitIndex());
+        verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(2), currentTerm, 2, newLeaderPayload2);
+
+        assertEquals("Follower 1 state", Lists.newArrayList(payload0, payload1, newLeaderPayload2),
+                follower1Actor.underlyingActor().getState());
+
+        removeIsolation();
+
+        // Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
+        // with a higher term.
+
+        expectFirstMatching(leaderNotifierActor, RoleChanged.class, rc -> rc.getNewRole().equals(RaftState.Follower.name()));
+
+        // The previous leader has a conflicting log entry at index 2 with a different term which should get
+        // replaced by the new leader's index 1 entry.
+
+        verifyApplyJournalEntries(leaderCollectorActor, 2);
+
+        assertEquals("Prior leader journal last term", currentTerm, leaderContext.getReplicatedLog().lastTerm());
+        assertEquals("Prior leader journal last index", 2, leaderContext.getReplicatedLog().lastIndex());
+        assertEquals("Prior leader commit index", 2, leaderContext.getCommitIndex());
+        verifyReplicatedLogEntry(leaderContext.getReplicatedLog().get(2), currentTerm, 2, newLeaderPayload2);
+
+        assertEquals("Prior leader state", Lists.newArrayList(payload0, payload1, newLeaderPayload2),
+                leaderActor.underlyingActor().getState());
+
+        testLog.info("testLeaderIsolationWithAllPriorEntriesCommitted ending");
+    }
+
+    /**
+     * Isolates the leader with a payload entry that's replicated to all followers and committed on the leader but
+     * uncommitted on the followers. While isolated, the majority partition elects a new leader and both sides of the
+     * partition attempt to commit one entry independently. After isolation is removed, the entry will conflict and both
+     * sides should reconcile their logs appropriately.
+     */
+    @Test
+    public void testLeaderIsolationWithPriorUncommittedEntryAndOneConflictingEntry() throws Exception {
+        testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndOneConflictingEntry starting");
+
+        createRaftActors();
+
+        // Submit an initial payload that is committed/applied on all nodes.
+
+        MockPayload payload0 = sendPayloadData(leaderActor, "zero");
+        verifyApplyJournalEntries(leaderCollectorActor, 0);
+        verifyApplyJournalEntries(follower1CollectorActor, 0);
+        verifyApplyJournalEntries(follower2CollectorActor, 0);
+
+        // Submit another payload that is replicated to all followers and committed on the leader but the leader is
+        // isolated before the entry is committed on the followers. To accomplish this we drop the AppendEntries
+        // with the updated leader commit index.
+
+        follower1Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
+        follower2Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
+
+        MockPayload payload1 = sendPayloadData(leaderActor, "one");
+
+        // Wait for the isolated leader to send AppendEntries to the followers with the new entry with index 1. This
+        // message is forwarded to the followers.
+
+        expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae -> {
+            return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1 &&
+                    ae.getEntries().get(0).getData().equals(payload1);
+        });
+
+        expectFirstMatching(follower2CollectorActor, AppendEntries.class, ae -> {
+            return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1 &&
+                    ae.getEntries().get(0).getData().equals(payload1);
+        });
+
+        verifyApplyJournalEntries(leaderCollectorActor, 1);
+
+        isolateLeader();
+
+        // Send a payload to the isolated leader so it has an uncommitted log entry with index 2.
+
+        testLog.info("Sending payload to isolated leader");
+
+        MockPayload isolatedLeaderPayload2 = sendPayloadData(leaderActor, "two");
+
+        // Wait for the isolated leader to send AppendEntries to follower1 with the entry at index 2. Note the message
+        // is collected but not forwarded to the follower RaftActor.
+
+        AppendEntries appendEntries = expectFirstMatching(follower1CollectorActor, AppendEntries.class);
+        assertEquals("getTerm", currentTerm, appendEntries.getTerm());
+        assertEquals("getLeaderId", leaderId, appendEntries.getLeaderId());
+        assertEquals("getEntries().size()", 1, appendEntries.getEntries().size());
+        verifyReplicatedLogEntry(appendEntries.getEntries().get(0), currentTerm, 2, isolatedLeaderPayload2);
+
+        // The leader should transition to IsolatedLeader.
+
+        expectFirstMatching(leaderNotifierActor, RoleChanged.class,
+                rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
+
+        forceElectionOnFollower1();
+
+        // Send a payload to the new leader follower1 and verify it's replicated to follower2 and committed. Since the
+        // entry with index 1 from the previous term was uncommitted, the new leader should've also committed a
+        // NoopPayload entry with index 2 in the PreLeader state. Thus the new payload will have index 3.
+
+        testLog.info("Sending payload to new leader");
+
+        MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
+        verifyApplyJournalEntries(follower1CollectorActor, 3);
+        verifyApplyJournalEntries(follower2CollectorActor, 3);
+
+        assertEquals("Follower 1 journal last term", currentTerm, follower1Context.getReplicatedLog().lastTerm());
+        assertEquals("Follower 1 journal last index", 3, follower1Context.getReplicatedLog().lastIndex());
+        assertEquals("Follower 1 commit index", 3, follower1Context.getCommitIndex());
+        verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(3), currentTerm, 3, newLeaderPayload2);
+
+        assertEquals("Follower 1 state", Lists.newArrayList(payload0, payload1, newLeaderPayload2),
+                follower1Actor.underlyingActor().getState());
+
+        removeIsolation();
+
+        // Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
+        // with a higher term.
+
+        expectFirstMatching(leaderNotifierActor, RoleChanged.class, rc -> rc.getNewRole().equals(RaftState.Follower.name()));
+
+        // The previous leader has a conflicting log entry at index 2 with a different term which should get
+        // replaced by the new leader's entry.
+
+        verifyApplyJournalEntries(leaderCollectorActor, 3);
+
+        verifyRaftState(leaderActor, raftState -> {
+            assertEquals("Prior leader journal last term", currentTerm, leaderContext.getReplicatedLog().lastTerm());
+            assertEquals("Prior leader journal last index", 3, leaderContext.getReplicatedLog().lastIndex());
+            assertEquals("Prior leader commit index", 3, leaderContext.getCommitIndex());
+        });
+
+        assertEquals("Prior leader state", Lists.newArrayList(payload0, payload1, newLeaderPayload2),
+                leaderActor.underlyingActor().getState());
+
+        // Ensure the prior leader didn't apply its conflicting entry with index 2, term 1.
+
+        List<ApplyState> applyState = getAllMatching(leaderCollectorActor, ApplyState.class);
+        for(ApplyState as: applyState) {
+            if(as.getReplicatedLogEntry().getIndex() == 2 && as.getReplicatedLogEntry().getTerm() == 1) {
+                fail("Got unexpected ApplyState: " + as);
+            }
+        }
+
+        // The prior leader should not have needed a snapshot installed in order to get it synced.
+
+        assertNoneMatching(leaderCollectorActor, InstallSnapshot.class);
+
+        testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndOneConflictingEntry ending");
+    }
+
+    /**
+     * Isolates the leader with a payload entry that's replicated to all followers and committed on the leader but
+     * uncommitted on the followers. While isolated, the majority partition elects a new leader and both sides of the
+     * partition attempt to commit multiple entries independently. After isolation is removed, the entries will conflict
+     * and both sides should reconcile their logs appropriately.
+     */
+    @Test
+    public void testLeaderIsolationWithPriorUncommittedEntryAndMultipleConflictingEntries() throws Exception {
+        testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndMultipleConflictingEntries starting");
+
+        createRaftActors();
+
+        // Submit an initial payload that is committed/applied on all nodes.
+
+        MockPayload payload0 = sendPayloadData(leaderActor, "zero");
+        verifyApplyJournalEntries(leaderCollectorActor, 0);
+        verifyApplyJournalEntries(follower1CollectorActor, 0);
+        verifyApplyJournalEntries(follower2CollectorActor, 0);
+
+        // Submit another payload that is replicated to all followers and committed on the leader but the leader is
+        // isolated before the entry is committed on the followers. To accomplish this we drop the AppendEntries
+        // with the updated leader commit index.
+
+        follower1Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
+        follower2Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderCommit() == 1);
+
+        MockPayload payload1 = sendPayloadData(leaderActor, "one");
+
+        // Wait for the isolated leader to send AppendEntries to the followers with the new entry with index 1. This
+        // message is forwarded to the followers.
+
+        expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae -> {
+            return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1 &&
+                    ae.getEntries().get(0).getData().equals(payload1);
+        });
+
+        expectFirstMatching(follower2CollectorActor, AppendEntries.class, ae -> {
+            return ae.getEntries().size() == 1 && ae.getEntries().get(0).getIndex() == 1 &&
+                    ae.getEntries().get(0).getData().equals(payload1);
+        });
+
+        verifyApplyJournalEntries(leaderCollectorActor, 1);
+
+        isolateLeader();
+
+        // Send 3 payloads to the isolated leader so it has uncommitted log entries.
+
+        testLog.info("Sending 3 payloads to isolated leader");
+
+        sendPayloadData(leaderActor, "two");
+        sendPayloadData(leaderActor, "three");
+        sendPayloadData(leaderActor, "four");
+
+        // Wait for the isolated leader to send AppendEntries to follower1 for each new entry. Note the messages
+        // are collected but not forwarded to the follower RaftActor.
+
+        expectFirstMatching(follower1CollectorActor, AppendEntries.class, ae -> {
+            for(ReplicatedLogEntry e: ae.getEntries()) {
+                if(e.getIndex() == 4) {
+                    return true;
+                }
+            }
+            return false;
+        });
+
+        // The leader should transition to IsolatedLeader.
+
+        expectFirstMatching(leaderNotifierActor, RoleChanged.class,
+                rc -> rc.getNewRole().equals(RaftState.IsolatedLeader.name()));
+
+        forceElectionOnFollower1();
+
+        // Send 3 payloads to the new leader follower1 and verify they're replicated to follower2 and committed. Since
+        // the entry with index 1 from the previous term was uncommitted, the new leader should've also committed a
+        // NoopPayload entry with index 2 in the PreLeader state. Thus the new payload indices will start at 3.
+
+        testLog.info("Sending 3 payloads to new leader");
+
+        MockPayload newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
+        MockPayload newLeaderPayload3 = sendPayloadData(follower1Actor, "three-new");
+        MockPayload newLeaderPayload4 = sendPayloadData(follower1Actor, "four-new");
+        verifyApplyJournalEntries(follower1CollectorActor, 5);
+        verifyApplyJournalEntries(follower2CollectorActor, 5);
+
+        assertEquals("Follower 1 journal last term", currentTerm, follower1Context.getReplicatedLog().lastTerm());
+        assertEquals("Follower 1 journal last index", 5, follower1Context.getReplicatedLog().lastIndex());
+        assertEquals("Follower 1 commit index", 5, follower1Context.getCommitIndex());
+        verifyReplicatedLogEntry(follower1Context.getReplicatedLog().get(5), currentTerm, 5, newLeaderPayload4);
+
+        assertEquals("Follower 1 state", Lists.newArrayList(payload0, payload1, newLeaderPayload2, newLeaderPayload3,
+                newLeaderPayload4), follower1Actor.underlyingActor().getState());
+
+        removeIsolation();
+
+        // Previous leader should switch to follower b/c it will receive either an AppendEntries or AppendEntriesReply
+        // with a higher term.
+
+        expectFirstMatching(leaderNotifierActor, RoleChanged.class, rc -> rc.getNewRole().equals(RaftState.Follower.name()));
+
+        // The previous leader has conflicting log entries starting at index 2 with different terms which should get
+        // replaced by the new leader's entries.
+
+        verifyApplyJournalEntries(leaderCollectorActor, 5);
+
+        verifyRaftState(leaderActor, raftState -> {
+            assertEquals("Prior leader journal last term", currentTerm, leaderContext.getReplicatedLog().lastTerm());
+            assertEquals("Prior leader journal last index", 5, leaderContext.getReplicatedLog().lastIndex());
+            assertEquals("Prior leader commit index", 5, leaderContext.getCommitIndex());
+        });
+
+        assertEquals("Prior leader state", Lists.newArrayList(payload0, payload1, newLeaderPayload2, newLeaderPayload3,
+                newLeaderPayload4), leaderActor.underlyingActor().getState());
+
+        // Ensure the prior leader didn't apply any of its conflicting entries with term 1.
+
+        List<ApplyState> applyState = getAllMatching(leaderCollectorActor, ApplyState.class);
+        for(ApplyState as: applyState) {
+            if(as.getReplicatedLogEntry().getTerm() == 1) {
+                fail("Got unexpected ApplyState: " + as);
+            }
+        }
+
+        // The prior leader should not have needed a snapshot installed in order to get it synced.
+
+        assertNoneMatching(leaderCollectorActor, InstallSnapshot.class);
+
+        testLog.info("testLeaderIsolationWithPriorUncommittedEntryAndMultipleConflictingEntries ending");
+    }
+
+    private void removeIsolation() {
+        testLog.info("Removing isolation");
+
+        clearMessages(leaderNotifierActor);
+        clearMessages(leaderCollectorActor);
+
+        leaderActor.underlyingActor().stopDropMessages(AppendEntries.class);
+        leaderActor.underlyingActor().stopDropMessages(RequestVote.class);
+        follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+        follower2Actor.underlyingActor().stopDropMessages(AppendEntries.class);
+    }
+
+    private void forceElectionOnFollower1() {
+        // Force follower1 to start an election. follower2 should grant the vote.
+
+        testLog.info("Forcing election on {}", follower1Id);
+
+        follower1Actor.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
+
+        expectFirstMatching(follower1NotifierActor, RoleChanged.class,
+                rc -> rc.getNewRole().equals(RaftState.Leader.name()));
+
+        currentTerm = follower1Context.getTermInformation().getCurrentTerm();
+    }
+
+    private void isolateLeader() {
+        // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
+
+        testLog.info("Isolating the leader");
+
+        leaderActor.underlyingActor().startDropMessages(AppendEntries.class);
+        leaderActor.underlyingActor().startDropMessages(RequestVote.class);
+
+        follower1Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderId().equals(leaderId));
+        follower2Actor.underlyingActor().startDropMessages(AppendEntries.class, ae -> ae.getLeaderId().equals(leaderId));
+
+        clearMessages(follower1CollectorActor);
+        clearMessages(follower1NotifierActor);
+        clearMessages(leaderNotifierActor);
+    }
+
+    private void createRaftActors() {
+        testLog.info("createRaftActors starting");
+
+        follower1NotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
+                factory.generateActorId(follower1Id + "-notifier"));
+
+        DefaultConfigParamsImpl followerConfigParams = new DefaultConfigParamsImpl();
+        followerConfigParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
+        followerConfigParams.setElectionTimeoutFactor(1000);
+        follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
+                ImmutableMap.of(leaderId, testActorPath(leaderId), follower2Id, testActorPath(follower2Id))).
+                config(followerConfigParams).roleChangeNotifier(follower1NotifierActor));
+
+        follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
+                follower1Id, testActorPath(follower1Id)), followerConfigParams);
+
+        peerAddresses = ImmutableMap.<String, String>builder().
+                put(follower1Id, follower1Actor.path().toString()).
+                put(follower2Id, follower2Actor.path().toString()).build();
+
+        leaderConfigParams = newLeaderConfigParams();
+        leaderConfigParams.setIsolatedLeaderCheckInterval(new FiniteDuration(500, TimeUnit.MILLISECONDS));
+
+        leaderNotifierActor = factory.createTestActor(Props.create(MessageCollectorActor.class),
+                factory.generateActorId(leaderId + "-notifier"));
+
+        leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses).
+                config(leaderConfigParams).roleChangeNotifier(leaderNotifierActor));
+
+        follower1CollectorActor = follower1Actor.underlyingActor().collectorActor();
+        follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
+        leaderCollectorActor = leaderActor.underlyingActor().collectorActor();
+
+        leaderActor.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
+        waitUntilLeader(leaderActor);
+
+        expectMatching(leaderCollectorActor, AppendEntriesReply.class, 2);
+
+
+        clearMessages(leaderCollectorActor);
+        clearMessages(follower1CollectorActor);
+        clearMessages(follower2CollectorActor);
+
+        leaderContext = leaderActor.underlyingActor().getRaftActorContext();
+        currentTerm = leaderContext.getTermInformation().getCurrentTerm();
+
+        follower1Context = follower1Actor.underlyingActor().getRaftActorContext();
+        follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
+
+        testLog.info("createRaftActors ending");
+    }
+}
index acffda7..a86e9e1 100644 (file)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
-import static akka.pattern.Patterns.ask;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
@@ -16,16 +15,11 @@ import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.pattern.Patterns;
 import akka.testkit.TestActorRef;
-import akka.util.Timeout;
-import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.concurrent.TimeUnit;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
-import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
-import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
@@ -167,23 +161,8 @@ public class LeadershipTransferIntegrationTest extends AbstractRaftActorIntegrat
         testLog.info("createRaftActors starting");
     }
 
-    private static void verifyRaftState(ActorRef raftActor, final RaftState expState) throws Throwable {
-        Timeout timeout = new Timeout(500, TimeUnit.MILLISECONDS);
-        Throwable lastError = null;
-        Stopwatch sw = Stopwatch.createStarted();
-        while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
-            try {
-                OnDemandRaftState raftState = (OnDemandRaftState)Await.result(ask(raftActor,
-                        GetOnDemandRaftState.INSTANCE, timeout), timeout.duration());
-                assertEquals("getRaftState", expState.toString(), raftState.getRaftState());
-                return;
-            } catch (Exception | AssertionError e) {
-                lastError = e;
-                Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
-            }
-        }
-
-        throw lastError;
+    private static void verifyRaftState(ActorRef raftActor, final RaftState expState) {
+        verifyRaftState(raftActor, rs -> assertEquals("getRaftState", expState.toString(), rs.getRaftState()));
     }
 
     private static void assertNullLeaderIdChange(TestActorRef<MessageCollectorActor> notifierActor) {
index a41c501..bb50a74 100644 (file)
@@ -82,6 +82,7 @@ public class MockRaftActorContext extends RaftActorContextImpl {
         replicatedLog.append(new MockReplicatedLogEntry(term, 1, new MockPayload("2")));
         setReplicatedLog(replicatedLog);
         setCommitIndex(replicatedLog.lastIndex());
+        setLastApplied(replicatedLog.lastIndex());
     }
 
     @Override public ActorRef actorOf(Props props) {
index 747bbe7..c6f5d72 100644 (file)
@@ -12,11 +12,13 @@ import static org.junit.Assert.assertNotNull;
 import akka.actor.ActorRef;
 import akka.persistence.SaveSnapshotSuccess;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.junit.Assert;
 import org.junit.Test;
@@ -312,7 +314,11 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
 
         MessageCollectorActor.clearMessages(leaderCollectorActor);
 
-        testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaAppendEntries: sending 1 more payload to trigger second snapshot");
+        testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot: sending 1 more payload to trigger second snapshot");
+
+        // Sleep for at least the election timeout interval so follower 2 is deemed inactive by the leader.
+        Uninterruptibles.sleepUninterruptibly(leaderConfigParams.getElectionTimeOutInterval().toMillis() + 5,
+                TimeUnit.MILLISECONDS);
 
         // Send another payload to trigger a second leader snapshot.
         MockPayload payload7 = sendPayloadData(leaderActor, "seven");
@@ -415,6 +421,10 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
 
         expSnapshotState.add(payload1);
 
+        // Sleep for at least the election timeout interval so follower 2 is deemed inactive by the leader.
+        Uninterruptibles.sleepUninterruptibly(leaderConfigParams.getElectionTimeOutInterval().toMillis() + 5,
+                TimeUnit.MILLISECONDS);
+
         // Send another payload with a large enough relative size in combination with the last payload
         // that exceeds the memory threshold (70% * 1000 = 700) - this should do a snapshot.
         MockPayload payload2 = sendPayloadData(leaderActor, "two", 201);
index 924ba57..65b3da6 100644 (file)
@@ -124,6 +124,7 @@ public class CandidateTest extends AbstractRaftActorBehaviorTest<Candidate> {
     @Test
     public void testBecomePreLeaderOnReceivingMajorityVotesInThreeNodeCluster(){
         MockRaftActorContext raftActorContext = createActorContext();
+        raftActorContext.setLastApplied(-1);
         raftActorContext.setPeerAddresses(setupPeers(2));
         candidate = new Candidate(raftActorContext);
 
index 9e91e51..f13938c 100644 (file)
@@ -202,6 +202,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
         actorContext.setCommitIndex(-1);
+        actorContext.setLastApplied(-1);
 
         // The raft context is initialized with a couple log entries. However the commitIndex
         // is -1, simulating that the leader previously didn't get consensus and thus the log entries weren't
@@ -585,8 +586,9 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         ByteString bs = toByteString(leadersSnapshot);
         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
-        LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs,
+        LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
+        fts.setSnapshotBytes(bs);
         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
 
         //send first chunk and no InstallSnapshotReply received yet
@@ -738,6 +740,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         actorContext.getReplicatedLog().removeFrom(0);
 
         leader = new Leader(actorContext);
+        actorContext.setCurrentBehavior(leader);
 
         // Leader will send an immediate heartbeat - ignore it.
         MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
@@ -761,7 +764,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
         // installed with a SendInstallSnapshot
-        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true));
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
 
         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
 
@@ -919,8 +922,9 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         ByteString bs = toByteString(leadersSnapshot);
         leader.setSnapshot(Snapshot.create(bs.toByteArray(), Collections.<ReplicatedLogEntry>emptyList(),
                 commitIndex, snapshotTerm, commitIndex, snapshotTerm));
-        LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs,
+        LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
+        fts.setSnapshotBytes(bs);
         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
         while(!fts.isLastChunk(fts.getChunkIndex())) {
             fts.getNextChunk();
@@ -1160,7 +1164,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         ByteString bs = toByteString(leadersSnapshot);
         byte[] barray = bs.toByteArray();
 
-        LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(bs, 50, "test");
+        LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
+        fts.setSnapshotBytes(bs);
 
         assertEquals(bs.size(), barray.length);
 
@@ -1265,7 +1270,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-        assertEquals(1, appendEntries.getLeaderCommit());
+        assertEquals(-1, appendEntries.getLeaderCommit());
         assertEquals(0, appendEntries.getEntries().size());
         assertEquals(0, appendEntries.getPrevLogIndex());
 
@@ -1320,7 +1325,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         // Initial heartbeat
         AppendEntries appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
-        assertEquals(1, appendEntries.getLeaderCommit());
+        assertEquals(-1, appendEntries.getLeaderCommit());
         assertEquals(0, appendEntries.getEntries().size());
         assertEquals(0, appendEntries.getPrevLogIndex());
 
@@ -1392,8 +1397,8 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MessageCollectorActor.clearMessages(followerActor);
         MessageCollectorActor.clearMessages(leaderActor);
 
-        // Verify initial AppendEntries sent with the leader's current commit index.
-        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        // Verify initial AppendEntries sent.
+        assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
         assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
 
@@ -1472,7 +1477,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MessageCollectorActor.clearMessages(leaderActor);
 
         // Verify initial AppendEntries sent with the leader's current commit index.
-        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
 
@@ -1553,7 +1558,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MessageCollectorActor.clearMessages(leaderActor);
 
         // Verify initial AppendEntries sent with the leader's current commit index.
-        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
         assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
 
@@ -1752,7 +1757,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         MessageCollectorActor.clearMessages(leaderActor);
 
         // Verify initial AppendEntries sent with the leader's current commit index.
-        assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
+        assertEquals("getLeaderCommit", -1, appendEntries.getLeaderCommit());
         assertEquals("Log entries size", 0, appendEntries.getEntries().size());
         assertEquals("getPrevLogIndex", 2, appendEntries.getPrevLogIndex());
 
@@ -1979,6 +1984,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());
         leaderActorContext.setCommitIndex(-1);
+        leaderActorContext.setLastApplied(-1);
 
         String nonVotingFollowerId = "nonvoting-follower";
         TestActorRef<ForwardMessageToBehaviorActor> nonVotingFollowerActor = actorFactory.createTestActor(
@@ -2039,6 +2045,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         logStart("testTransferLeadershipWithFollowerInSync");
 
         MockRaftActorContext leaderActorContext = createActorContextWithFollower();
+        leaderActorContext.setLastApplied(-1);
         ((DefaultConfigParamsImpl)leaderActorContext.getConfigParams()).setHeartBeatInterval(
                 new FiniteDuration(1000, TimeUnit.SECONDS));
         leaderActorContext.setReplicatedLog(new MockRaftActorContext.MockReplicatedLogBuilder().build());