Force install snapshot when follower log is ahead 35/41535/2
authorTom Pantelis <tpanteli@brocade.com>
Fri, 1 Jul 2016 04:25:17 +0000 (00:25 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Fri, 8 Jul 2016 07:48:58 +0000 (07:48 +0000)
It's possible for a follower's log to actually be ahead of the leader's log.
Normally this doesn't happen in raft as a node cannot become leader if its
log is behind another's. However, the non-voting semantics deviate a bit
from raft. Only voting members participate in elections and can become
leader so it's possible for a non-voting follower to be ahead of the leader.
This can happen if persistence is disabled and all voting members are
restarted. In this case, the voting leader will start out with an empty log
however the non-voting followers still retain the previous data in memory.
On the first AppendEntries, the non-voting follower returns a successful
reply b/c the prevLogIndex sent by the leader is -1 and thus the integrity
checks pass. However the follower's returned lastLogIndex may be higher in
which case we want to reset the follower by installing a snapshot.
Therefore I added a check in AbstractLeader#handeAppendEntriesReply if
the reply lastLogIndex > leader's last index.

Since the initial AppendEntries is sent immediately by the leader,
normally the follower will reply and this change works. However if a
follower happens to be disconnected and doesn't reply for some time, the
leader can still progress with new commits. If the leader has enough
commits such that its lastIndex matches or exceeds the lagging
non-voting follower, this check doesn't work. In this case, the
follower's integrity checks will fail since the leader's prevLogTerm
will differ. On reply the leader will start decrementing the follower's
nextIndex in an attempt to find where the logs match. During this
process the leader may trim its log via replicatedToAllIndex in which
case the follower's nextIndex may no longer be in the leader's log and
the leader will install a snapshot.

However if other nodes are down and prevent the log trimming then the
follower's nextIndex may be in the log until it eventually decrements to
0. The follower's integrity checks will pass in this case since the
leader's prevLogIndex will be -1. The follower will then attempt to add
the leader's log entries to its log. It first loops the log entries in
the AppendEntries with the intent of skipping matching entries in its
log (ie index and term the same) and stopping when it finds an entry
that doesn;t exist or finds one whose term doesn't match, in which case
it removes the entries beginning at this index. However I found some
issue in this code. First it was calling get on the getReplicatedLog
which doesn't take into account that the index may be part of the prior
snaphot and not actually in the log. I changed this check to
isLogEntryPresent which takes into account the snapshot. Second, if it
hits a conflicting entry it tries to remove it from the log. However,
as before, it may be in the snapshot and not in the log in which case
nothing gets removed. To alleviate this, I modified removeFromAndPersist
to return a boolean - false meaning it didn't find the index. In this
case I changed it to send back a reply to force a snapshot.

I added several tests in a new class NonVotingFollowerIntegrationTest
that runs thru various scenarios to cover the cases described above.

While testing I ran into some orthoganl issues that I also fixed.

- if a leader has only non-voting followers, on replicate, it should
  immediately commit and apply to state as it does when there's no
  followers since it doesn't need consensus from non-voting followers.
  So I added a method anyVotingPeers to RaftActorContext to handle this
  case.

- When calculating the prevLogIndex and prevLogTerm for the
  AppendEntries message, it calls get on the getReplicatedLog
  which doesn't take into account that the index may be the snaphot
  index/term. Follower does this check prevLogIndex/prevLogTerm so
  the leader should as well.

Change-Id: I3f92fc0b92ddc6d02dc6cb0e56b444a7c61035d7
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
14 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorContextImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActorRecoverySupport.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractRaftActorBehavior.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActor.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/NonVotingFollowerIntegrationTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java

index 0d125ed..69b7845 100644 (file)
@@ -214,12 +214,6 @@ public abstract class AbstractReplicatedLogImpl implements ReplicatedLog {
         return snapshotTerm;
     }
 
-    @Override
-    public abstract void appendAndPersist(ReplicatedLogEntry replicatedLogEntry);
-
-    @Override
-    public abstract void removeFromAndPersist(long index);
-
     @Override
     public void setSnapshotIndex(long snapshotIndex) {
         this.snapshotIndex = snapshotIndex;
index 6b5fffe..a099b99 100644 (file)
@@ -243,6 +243,11 @@ public interface RaftActorContext {
      */
     boolean isVotingMember();
 
+    /**
+     * @return true if there are any voting peers, false otherwise.
+     */
+    boolean anyVotingPeers();
+
     /**
      * @return current behavior attached to the raft actor.
      */
index f163c90..7c12169 100644 (file)
@@ -69,6 +69,8 @@ public class RaftActorContextImpl implements RaftActorContext {
 
     private RaftActorBehavior currentBehavior;
 
+    private int numVotingPeers = -1;
+
     public RaftActorContextImpl(ActorRef actor, ActorContext context, String id,
             ElectionTerm termInformation, long commitIndex, long lastApplied, Map<String, String> peerAddresses,
             ConfigParams configParams, DataPersistenceProvider persistenceProvider, Logger logger) {
@@ -237,6 +239,7 @@ public class RaftActorContextImpl implements RaftActorContext {
     @Override
     public void addToPeers(String id, String address, VotingState votingState) {
         peerInfoMap.put(id, new PeerInfo(id, address, votingState));
+        numVotingPeers = -1;
     }
 
     @Override
@@ -245,6 +248,7 @@ public class RaftActorContextImpl implements RaftActorContext {
             votingMember = false;
         } else {
             peerInfoMap.remove(name);
+            numVotingPeers = -1;
         }
     }
 
@@ -332,6 +336,20 @@ public class RaftActorContextImpl implements RaftActorContext {
         return votingMember;
     }
 
+    @Override
+    public boolean anyVotingPeers() {
+        if(numVotingPeers < 0) {
+            numVotingPeers = 0;
+            for(PeerInfo info: getPeers()) {
+                if(info.isVoting()) {
+                    numVotingPeers++;
+                }
+            }
+        }
+
+        return numVotingPeers > 0;
+    }
+
     @Override
     public RaftActorBehavior getCurrentBehavior() {
         return currentBehavior;
index 1c86dcb..68ce62f 100644 (file)
@@ -94,6 +94,7 @@ class RaftActorRecoverySupport {
                         context.getTermInformation().getVotedFor());
             }
 
+            onRecoveryCompletedMessage();
             possiblyRestoreFromSnapshot();
         } else {
             boolean isServerConfigPayload = false;
index 7ff6e86..e7f1be5 100644 (file)
@@ -58,8 +58,9 @@ public interface ReplicatedLog {
      * reconstruct the state of the in-memory replicated log
      *
      * @param index the index of the first log entry to remove
+     * @return
      */
-    void removeFromAndPersist(long index);
+    boolean removeFromAndPersist(long index);
 
     /**
      * Appends an entry to the log.
index 9eaf6c3..dfaed9b 100644 (file)
@@ -44,12 +44,15 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
     }
 
     @Override
-    public void removeFromAndPersist(final long logEntryIndex) {
+    public boolean removeFromAndPersist(final long logEntryIndex) {
         // FIXME: Maybe this should be done after the command is saved
         long adjustedIndex = removeFrom(logEntryIndex);
         if(adjustedIndex >= 0) {
             context.getPersistenceProvider().persist(new DeleteEntries(adjustedIndex), deleteProcedure);
+            return true;
         }
+
+        return false;
     }
 
     @Override
index 5219ebb..f39351b 100644 (file)
@@ -218,13 +218,36 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         followerLogInformation.setRaftVersion(appendEntriesReply.getRaftVersion());
 
         boolean updated = false;
-        if (appendEntriesReply.isSuccess()) {
+        if(appendEntriesReply.getLogLastIndex() > context.getReplicatedLog().lastIndex()) {
+            // The follower's log is actually ahead of the leader's log. Normally this doesn't happen
+            // in raft as a node cannot become leader if it's log is behind another's. However, the
+            // non-voting semantics deviate a bit from raft. Only voting members participate in
+            // elections and can become leader so it's possible for a non-voting follower to be ahead
+            // of the leader. This can happen if persistence is disabled and all voting members are
+            // restarted. In this case, the voting leader will start out with an empty log however
+            // the non-voting followers still retain the previous data in memory. On the first
+            // AppendEntries, the non-voting follower returns a successful reply b/c the prevLogIndex
+            // sent by the leader is -1 and thus the integrity checks pass. However the follower's returned
+            // lastLogIndex may be higher in which case we want to reset the follower by installing a
+            // snapshot. It's also possible that the follower's last log index is behind the leader's.
+            // However in this case the log terms won't match and the logs will conflict - this is handled
+            // elsewhere.
+            LOG.debug("{}: handleAppendEntriesReply: follower {} lastIndex {} is ahead of our lastIndex {} - forcing install snaphot",
+                    logName(), followerLogInformation.getId(), appendEntriesReply.getLogLastIndex(),
+                    context.getReplicatedLog().lastIndex());
+
+            followerLogInformation.setMatchIndex(-1);
+            followerLogInformation.setNextIndex(-1);
+
+            initiateCaptureSnapshot(followerId);
+            updated = true;
+        } else if (appendEntriesReply.isSuccess()) {
             updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
         } else {
             LOG.debug("{}: handleAppendEntriesReply: received unsuccessful reply: {}", logName(), appendEntriesReply);
 
             long followerLastLogIndex = appendEntriesReply.getLogLastIndex();
-            ReplicatedLogEntry followersLastLogEntry = context.getReplicatedLog().get(followerLastLogIndex);
+            long followersLastLogTerm = getLogEntryTerm(followerLastLogIndex);
             if(appendEntriesReply.isForceInstallSnapshot()) {
                 // Reset the followers match and next index. This is to signal that this follower has nothing
                 // in common with this Leader and so would require a snapshot to be installed
@@ -233,8 +256,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
                 // Force initiate a snapshot capture
                 initiateCaptureSnapshot(followerId);
-            } else if(followerLastLogIndex < 0 || (followersLastLogEntry != null &&
-                    followersLastLogEntry.getTerm() == appendEntriesReply.getLogLastTerm())) {
+            } else if(followerLastLogIndex < 0 || (followersLastLogTerm >= 0 &&
+                    followersLastLogTerm == appendEntriesReply.getLogLastTerm())) {
                 // The follower's log is empty or the last entry is present in the leader's journal
                 // and the terms match so the follower is just behind the leader's journal from
                 // the last snapshot, if any. We'll catch up the follower quickly by starting at the
@@ -242,11 +265,11 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
                 updated = updateFollowerLogInformation(followerLogInformation, appendEntriesReply);
             } else {
-                // TODO: When we find that the follower is out of sync with the
-                // Leader we simply decrement that followers next index by 1.
-                // Would it be possible to do better than this? The RAFT spec
-                // does not explicitly deal with it but may be something for us to
-                // think about.
+                // The follower's log conflicts with leader's log so decrement follower's next index by 1
+                // in an attempt to find where the logs match.
+
+                LOG.debug("{}: follower's last log term {} conflicts with the leader's {} - dec next index",
+                        logName(), appendEntriesReply.getLogLastTerm(), followersLastLogTerm);
 
                 followerLogInformation.decrNextIndex();
             }
@@ -321,6 +344,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
         //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event
         sendUpdatesToFollower(followerId, followerLogInformation, false, !updated);
+
         return this;
     }
 
@@ -516,7 +540,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 logIndex)
         );
 
-        boolean applyModificationToState = followerToLog.isEmpty()
+        boolean applyModificationToState = !context.anyVotingPeers()
                 || context.getRaftPolicy().applyModificationToStateBeforeConsensus();
 
         if(applyModificationToState){
@@ -596,7 +620,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     // then snapshot should be sent
 
                     if (LOG.isDebugEnabled()) {
-                        LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s," +
+                        LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s, " +
                                     "follower-nextIndex: %d, leader-snapshot-index: %d,  " +
                                     "leader-last-index: %d", logName(), followerId,
                                     followerNextIndex, leaderSnapShotIndex, leaderLastIndex));
@@ -626,8 +650,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex,
         List<ReplicatedLogEntry> entries, String followerId) {
         AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
-            prevLogIndex(followerNextIndex),
-            prevLogTerm(followerNextIndex), entries,
+            getLogEntryIndex(followerNextIndex - 1),
+            getLogEntryTerm(followerNextIndex - 1), entries,
             context.getCommitIndex(), super.getReplicatedToAllIndex(), context.getPayloadVersion());
 
         if(!entries.isEmpty() || LOG.isTraceEnabled()) {
index 4651237..59ba1a9 100644 (file)
@@ -321,26 +321,34 @@ public abstract class AbstractRaftActorBehavior implements RaftActorBehavior {
 
     /**
      *
-     * @return log index from the previous to last entry in the log
+     * @return the log entry index for the given index or -1 if not found
      */
-    protected long prevLogIndex(long index){
-        ReplicatedLogEntry prevEntry =
-            context.getReplicatedLog().get(index - 1);
-        if (prevEntry != null) {
-            return prevEntry.getIndex();
+    protected long getLogEntryIndex(long index){
+        if(index == context.getReplicatedLog().getSnapshotIndex()){
+            return context.getReplicatedLog().getSnapshotIndex();
         }
+
+        ReplicatedLogEntry entry = context.getReplicatedLog().get(index);
+        if(entry != null){
+            return entry.getIndex();
+        }
+
         return -1;
     }
 
     /**
-     * @return log term from the previous to last entry in the log
+     * @return the log entry term for the given index or -1 if not found
      */
-    protected long prevLogTerm(long index){
-        ReplicatedLogEntry prevEntry =
-            context.getReplicatedLog().get(index - 1);
-        if (prevEntry != null) {
-            return prevEntry.getTerm();
+    protected long getLogEntryTerm(long index){
+        if(index == context.getReplicatedLog().getSnapshotIndex()){
+            return context.getReplicatedLog().getSnapshotTerm();
         }
+
+        ReplicatedLogEntry entry = context.getReplicatedLog().get(index);
+        if(entry != null){
+            return entry.getTerm();
+        }
+
         return -1;
     }
 
index d484b25..e4d4266 100644 (file)
@@ -97,28 +97,11 @@ public class Follower extends AbstractRaftActorBehavior {
             return true;
         }
 
-        ReplicatedLogEntry previousEntry = context.getReplicatedLog()
-                .get(index);
-
-        return previousEntry != null;
+        ReplicatedLogEntry entry = context.getReplicatedLog().get(index);
+        return entry != null;
 
     }
 
-    private long getLogEntryTerm(long index){
-        if(index == context.getReplicatedLog().getSnapshotIndex()){
-            return context.getReplicatedLog().getSnapshotTerm();
-        }
-
-        ReplicatedLogEntry previousEntry = context.getReplicatedLog()
-                .get(index);
-
-        if(previousEntry != null){
-            return previousEntry.getTerm();
-        }
-
-        return -1;
-    }
-
     private void updateInitialSyncStatus(long currentLeaderCommit, String leaderId){
         initialSyncStatusTracker.update(leaderId, currentLeaderCommit, context.getCommitIndex());
     }
@@ -184,14 +167,20 @@ public class Follower extends AbstractRaftActorBehavior {
                 // Find the entry up until the one that is not in the follower's log
                 for (int i = 0;i < appendEntries.getEntries().size(); i++, addEntriesFrom++) {
                     ReplicatedLogEntry matchEntry = appendEntries.getEntries().get(i);
-                    ReplicatedLogEntry newEntry = context.getReplicatedLog().get(matchEntry.getIndex());
 
-                    if (newEntry == null) {
-                        //newEntry not found in the log
+                    if(!isLogEntryPresent(matchEntry.getIndex())) {
+                        // newEntry not found in the log
                         break;
                     }
 
-                    if (newEntry.getTerm() == matchEntry.getTerm()) {
+                    long existingEntryTerm = getLogEntryTerm(matchEntry.getIndex());
+
+                    LOG.debug("{}: matchEntry {} is present: existingEntryTerm: {}", logName(), matchEntry,
+                            existingEntryTerm);
+
+                    // existingEntryTerm == -1 means it's in the snapshot and not in the log. We don't know
+                    // what the term was so we'll assume it matches.
+                    if(existingEntryTerm == -1 || existingEntryTerm == matchEntry.getTerm()) {
                         continue;
                     }
 
@@ -201,7 +190,18 @@ public class Follower extends AbstractRaftActorBehavior {
                                 matchEntry.getIndex());
 
                         // Entries do not match so remove all subsequent entries
-                        context.getReplicatedLog().removeFromAndPersist(matchEntry.getIndex());
+                        if(!context.getReplicatedLog().removeFromAndPersist(matchEntry.getIndex())) {
+                            // Could not remove the entries - this means the matchEntry index must be in the
+                            // snapshot and not the log. In this case the prior entries are part of the state
+                            // so we must send back a reply to force a snapshot to completely re-sync the
+                            // follower's log and state.
+
+                            LOG.debug("{}: Could not remove entries - sending reply to force snapshot", logName());
+                            sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
+                                    lastTerm(), context.getPayloadVersion(), true), actor());
+                            return this;
+                        }
+
                         break;
                     } else {
                         sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
@@ -212,8 +212,8 @@ public class Follower extends AbstractRaftActorBehavior {
             }
 
             lastIndex = lastIndex();
-            LOG.debug("{}: After cleanup entries to be added from = {}", logName(),
-                        (addEntriesFrom + lastIndex));
+            LOG.debug("{}: After cleanup, lastIndex: {}, entries to be added from: {}", logName(),
+                    lastIndex, addEntriesFrom);
 
             // 4. Append any new entries not already in the log
             for (int i = addEntriesFrom; i < appendEntries.getEntries().size(); i++) {
index 8a8d313..4e2fe80 100644 (file)
@@ -322,7 +322,8 @@ public class AbstractReplicatedLogImplTest {
         }
 
         @Override
-        public void removeFromAndPersist(final long index) {
+        public boolean removeFromAndPersist(final long index) {
+            return true;
         }
 
         @Override
index 411b23d..3ba664b 100644 (file)
@@ -47,7 +47,8 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
     private final Function<Runnable, Void> pauseLeaderFunction;
 
     protected MockRaftActor(AbstractBuilder<?, ?> builder) {
-        super(builder.id, builder.peerAddresses, Optional.fromNullable(builder.config), PAYLOAD_VERSION);
+        super(builder.id, builder.peerAddresses != null ? builder.peerAddresses :
+            Collections.<String, String>emptyMap(), Optional.fromNullable(builder.config), PAYLOAD_VERSION);
         state = new ArrayList<>();
         this.actorDelegate = mock(RaftActor.class);
         this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
@@ -114,7 +115,6 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
         return state;
     }
 
-
     @Override
     protected void applyState(ActorRef clientActor, Identifier identifier, Object data) {
         actorDelegate.applyState(clientActor, identifier, data);
@@ -169,6 +169,7 @@ public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort,
         try {
             Object data = toObject(bytes);
             if (data instanceof List) {
+                state.clear();
                 state.addAll((List<?>) data);
             }
         } catch (Exception e) {
index 723de65..5a4b437 100644 (file)
@@ -145,8 +145,9 @@ public class MockRaftActorContext extends RaftActorContextImpl {
         public void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry) {
         }
 
-        @Override public void removeFromAndPersist(long index) {
-            removeFrom(index);
+        @Override
+        public boolean removeFromAndPersist(long index) {
+            return removeFrom(index) >= 0;
         }
 
         @Override
diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/NonVotingFollowerIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/NonVotingFollowerIntegrationTest.java
new file mode 100644 (file)
index 0000000..adbc5e8
--- /dev/null
@@ -0,0 +1,411 @@
+/*
+ * Copyright (c) 2016 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.raft;
+
+import static org.junit.Assert.assertEquals;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import java.util.Arrays;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
+import org.opendaylight.controller.cluster.raft.base.messages.SnapshotComplete;
+import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
+
+/**
+ * Integration test for various scenarios involving non-voting followers.
+ *
+ * @author Thomas Pantelis
+ */
+public class NonVotingFollowerIntegrationTest extends AbstractRaftActorIntegrationTest {
+    private TestRaftActor followerInstance;
+    private TestRaftActor leaderInstance;
+
+    /**
+     * Tests non-voting follower re-sync after the non-persistent leader restarts with an empty log. In this
+     * case the follower's log will be ahead of the leader's log as the follower retains the previous
+     * data in memory. The leader must force an install snapshot to re-sync the follower's state.
+     */
+    @Test
+    public void testFollowerResyncWithEmptyLeaderLogAfterNonPersistentLeaderRestart() {
+        testLog.info("testFollowerResyncWithEmptyLeaderLogAfterNonPersistentLeaderRestart starting");
+
+        setupLeaderAndNonVotingFollower();
+
+        // Add log entries and verify they are committed and applied by both nodes.
+
+        expSnapshotState.add(sendPayloadData(leaderActor, "zero"));
+        expSnapshotState.add(sendPayloadData(leaderActor, "one"));
+        expSnapshotState.add(sendPayloadData(leaderActor, "two"));
+
+        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 3);
+        MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 3);
+
+        assertEquals("Leader journal lastIndex", 2, leaderContext.getReplicatedLog().lastIndex());
+        assertEquals("Leader commit index", 2, leaderContext.getCommitIndex());
+        assertEquals("Follower journal lastIndex", 2, follower1Context.getReplicatedLog().lastIndex());
+        assertEquals("Follower commit index", 2, follower1Context.getCommitIndex());
+        assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
+
+        // Persisted journal should only contain the ServerConfigurationPayload and 2 UpdateElectionTerm entries.
+        assertEquals("Leader persisted journal size", 3, InMemoryJournal.get(leaderId).size());
+
+        // Restart the leader
+
+        killActor(leaderActor);
+        MessageCollectorActor.clearMessages(follower1CollectorActor);
+
+        createNewLeaderActor();
+
+        //follower1Actor.underlyingActor().startDropMessages(AppendEntries.class);
+
+        currentTerm++;
+        assertEquals("Leader term", currentTerm, leaderContext.getTermInformation().getCurrentTerm());
+        assertEquals("Leader journal lastIndex", -1, leaderContext.getReplicatedLog().lastIndex());
+        assertEquals("Leader commit index", -1, leaderContext.getCommitIndex());
+
+        // After restart, the leader's log and the follower's log will be ahead so the leader should force an
+        // install snapshot to re-sync the follower's log and state.
+
+        MessageCollectorActor.expectFirstMatching(follower1CollectorActor, SnapshotComplete.class);
+
+        assertEquals("Follower term", currentTerm, follower1Context.getTermInformation().getCurrentTerm());
+        assertEquals("Follower journal lastIndex", -1, follower1Context.getReplicatedLog().lastIndex());
+        assertEquals("Follower commit index", -1, follower1Context.getCommitIndex());
+
+        expSnapshotState.add(sendPayloadData(leaderActor, "zero-1"));
+
+        MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
+
+        assertEquals("Follower journal lastIndex", 0, follower1Context.getReplicatedLog().lastIndex());
+        assertEquals("Follower commit index", 0, follower1Context.getCommitIndex());
+        assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
+
+        testLog.info("testFollowerResyncWithEmptyLeaderLogAfterNonPersistentLeaderRestart ending");
+    }
+
+    /**
+     * Tests non-voting follower re-sync after the non-persistent leader restarts and commits new log
+     * entries prior to re-connecting to the follower. The leader's last index will still be less than the
+     * follower's last index corresponding to the previous data retained in memory. So the follower's log
+     * will be ahead of the leader's log and the leader must force an install snapshot to re-sync the
+     * follower's state.
+     */
+    @Test
+    public void testFollowerResyncWithLessLeaderLogEntriesAfterNonPersistentLeaderRestart() {
+        testLog.info("testFollowerResyncWithLessLeaderLogEntriesAfterNonPersistentLeaderRestart starting");
+
+        setupLeaderAndNonVotingFollower();
+
+        // Add log entries and verify they are committed and applied by both nodes.
+
+        expSnapshotState.add(sendPayloadData(leaderActor, "zero"));
+        expSnapshotState.add(sendPayloadData(leaderActor, "one"));
+        expSnapshotState.add(sendPayloadData(leaderActor, "two"));
+
+        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 3);
+        MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 3);
+
+        assertEquals("Leader journal lastIndex", 2, leaderContext.getReplicatedLog().lastIndex());
+        assertEquals("Leader commit index", 2, leaderContext.getCommitIndex());
+        assertEquals("Follower journal lastIndex", 2, follower1Context.getReplicatedLog().lastIndex());
+        assertEquals("Follower commit index", 2, follower1Context.getCommitIndex());
+        assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
+
+        // Restart the leader
+
+        killActor(leaderActor);
+        MessageCollectorActor.clearMessages(follower1CollectorActor);
+
+        // Temporarily drop AppendEntries to simulate a disconnect when the leader restarts.
+        followerInstance.startDropMessages(AppendEntries.class);
+
+        createNewLeaderActor();
+
+        currentTerm++;
+        assertEquals("Leader term", currentTerm, leaderContext.getTermInformation().getCurrentTerm());
+        assertEquals("Leader journal lastIndex", -1, leaderContext.getReplicatedLog().lastIndex());
+        assertEquals("Leader commit index", -1, leaderContext.getCommitIndex());
+
+        // Add new log entries to the leader - one less than the prior log entries
+
+        expSnapshotState.add(sendPayloadData(leaderActor, "zero-1"));
+        expSnapshotState.add(sendPayloadData(leaderActor, "one-1"));
+
+        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 2);
+        assertEquals("Leader journal lastIndex", 1, leaderContext.getReplicatedLog().lastIndex());
+        assertEquals("Leader commit index", 1, leaderContext.getCommitIndex());
+
+        // Re-enable AppendEntries to the follower. The leaders previous index will be present in the
+        // follower's but the terms won't match and the follower's log will be ahead of the leader's log
+        // The leader should force an install snapshot to re-sync the entire follower's log and state.
+
+        followerInstance.stopDropMessages(AppendEntries.class);
+        MessageCollectorActor.expectFirstMatching(follower1CollectorActor, SnapshotComplete.class);
+
+        assertEquals("Follower term", currentTerm, follower1Context.getTermInformation().getCurrentTerm());
+        assertEquals("Follower journal lastIndex", 1, follower1Context.getReplicatedLog().lastIndex());
+        assertEquals("Follower journal lastTerm", currentTerm, follower1Context.getReplicatedLog().lastTerm());
+        assertEquals("Follower commit index", 1, follower1Context.getCommitIndex());
+        assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
+
+        testLog.info("testFollowerResyncWithLessLeaderLogEntriesAfterNonPersistentLeaderRestart ending");
+    }
+
+    /**
+     * Tests non-voting follower re-sync after the non-persistent leader restarts and commits new log
+     * entries prior to re-connecting to the follower. The leader's last index will be 1 greater than the
+     * follower's last index corresponding to the previous data retained in memory. So the follower's log
+     * will be behind the leader's log but the leader's log entries will have a higher term. In this case the
+     * leader should force an install snapshot to re-sync the follower's state.
+     */
+    @Test
+    public void testFollowerResyncWithOneMoreLeaderLogEntryAfterNonPersistentLeaderRestart() {
+        testLog.info("testFollowerResyncWithOneMoreLeaderLogEntryAfterNonPersistentLeaderRestart starting");
+
+        setupLeaderAndNonVotingFollower();
+
+        // Add log entries and verify they are committed and applied by both nodes.
+
+        expSnapshotState.add(sendPayloadData(leaderActor, "zero"));
+        expSnapshotState.add(sendPayloadData(leaderActor, "one"));
+
+        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 2);
+        MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 2);
+
+        assertEquals("Leader journal lastIndex", 1, leaderContext.getReplicatedLog().lastIndex());
+        assertEquals("Leader commit index", 1, leaderContext.getCommitIndex());
+        assertEquals("Follower journal lastIndex", 1, follower1Context.getReplicatedLog().lastIndex());
+        assertEquals("Follower commit index", 1, follower1Context.getCommitIndex());
+        assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
+
+        // Restart the leader
+
+        killActor(leaderActor);
+        MessageCollectorActor.clearMessages(follower1CollectorActor);
+
+        // Temporarily drop AppendEntries to simulate a disconnect when the leader restarts.
+        followerInstance.startDropMessages(AppendEntries.class);
+
+        createNewLeaderActor();
+
+        currentTerm++;
+        assertEquals("Leader term", currentTerm, leaderContext.getTermInformation().getCurrentTerm());
+        assertEquals("Leader journal lastIndex", -1, leaderContext.getReplicatedLog().lastIndex());
+        assertEquals("Leader commit index", -1, leaderContext.getCommitIndex());
+
+        // Add new log entries to the leader - one more than the prior log entries
+
+        expSnapshotState.add(sendPayloadData(leaderActor, "zero-1"));
+        expSnapshotState.add(sendPayloadData(leaderActor, "one-1"));
+        expSnapshotState.add(sendPayloadData(leaderActor, "two-1"));
+
+        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 3);
+        assertEquals("Leader journal lastIndex", 2, leaderContext.getReplicatedLog().lastIndex());
+        assertEquals("Leader commit index", 2, leaderContext.getCommitIndex());
+        assertEquals("Leader replicatedToAllIndex", -1, leaderInstance.getCurrentBehavior().getReplicatedToAllIndex());
+
+        // Re-enable AppendEntries to the follower. The follower's log will be out of sync and it should
+        // should force the leader to install snapshot to re-sync the entire follower's log and state.
+
+        followerInstance.stopDropMessages(AppendEntries.class);
+        MessageCollectorActor.expectFirstMatching(follower1CollectorActor, SnapshotComplete.class);
+
+        assertEquals("Follower term", currentTerm, follower1Context.getTermInformation().getCurrentTerm());
+        assertEquals("Follower journal lastIndex", 2, follower1Context.getReplicatedLog().lastIndex());
+        assertEquals("Follower journal lastTerm", currentTerm, follower1Context.getReplicatedLog().lastTerm());
+        assertEquals("Follower commit index", 2, follower1Context.getCommitIndex());
+        assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
+
+        testLog.info("testFollowerResyncWithOneMoreLeaderLogEntryAfterNonPersistentLeaderRestart ending");
+    }
+
+    /**
+     * Tests non-voting follower re-sync after the non-persistent leader restarts and commits new log
+     * entries prior to re-connecting to the follower. The leader's last index will be greater than the
+     * follower's last index corresponding to the previous data retained in memory. So the follower's log
+     * will be behind the leader's log but the leader's log entries will have a higher term. It also adds a
+     * "down" peer on restart so the leader doesn't trim its log as it's trying to resync the follower.
+     * Eventually the follower should force the leader to install snapshot to re-sync its state.
+     */
+    @Test
+    public void testFollowerResyncWithMoreLeaderLogEntriesAndDownPeerAfterNonPersistentLeaderRestart() {
+        testLog.info("testFollowerResyncWithMoreLeaderLogEntriesAndDownPeerAfterNonPersistentLeaderRestart starting");
+
+        setupLeaderAndNonVotingFollower();
+
+        // Add log entries and verify they are committed and applied by both nodes.
+
+        expSnapshotState.add(sendPayloadData(leaderActor, "zero"));
+        expSnapshotState.add(sendPayloadData(leaderActor, "one"));
+        expSnapshotState.add(sendPayloadData(leaderActor, "two"));
+
+        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, expSnapshotState.size());
+        MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, expSnapshotState.size());
+
+        long lastIndex = 2;
+        assertEquals("Leader journal lastIndex", lastIndex, leaderContext.getReplicatedLog().lastIndex());
+        assertEquals("Leader commit index", lastIndex, leaderContext.getCommitIndex());
+        assertEquals("Follower journal lastIndex", lastIndex, follower1Context.getReplicatedLog().lastIndex());
+        assertEquals("Follower commit index", lastIndex, follower1Context.getCommitIndex());
+        assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
+
+        MessageCollectorActor.clearMessages(follower1CollectorActor);
+        MessageCollectorActor.expectFirstMatching(follower1CollectorActor, AppendEntries.class);
+        assertEquals("Follower snapshot index", lastIndex - 1, follower1Context.getReplicatedLog().getSnapshotIndex());
+        assertEquals("Follower journal size", 1, leaderContext.getReplicatedLog().size());
+
+        // Restart the leader
+
+        killActor(leaderActor);
+        MessageCollectorActor.clearMessages(follower1CollectorActor);
+
+        // Temporarily drop AppendEntries to simulate a disconnect when the leader restarts.
+        followerInstance.startDropMessages(AppendEntries.class);
+
+        // Add a "down" peer so the leader doesn't trim its log as it's trying to resync the follower. The
+        // leader will keep decrementing the follower's nextIndex to try to find a matching index. Since
+        // there is no matching index it will eventually hit index 0 which should cause the follower to
+        // force an install snapshot upon failure to remove the conflicting indexes due to indexes 0 and 1
+        // being in the prior snapshot and not the log.
+        //
+        // We also add another voting follower actor into the mix even though it shoildn't affect the
+        // outcome.
+        ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
+                new ServerInfo(leaderId, true), new ServerInfo(follower1Id, false),
+                new ServerInfo(follower2Id, true), new ServerInfo("downPeer", false)));
+        ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, currentTerm,
+                persistedServerConfig);
+
+        InMemoryJournal.clear();
+        InMemoryJournal.addEntry(leaderId, 1, new UpdateElectionTerm(currentTerm, leaderId));
+        InMemoryJournal.addEntry(leaderId, 2, persistedServerConfigEntry);
+        InMemoryJournal.addEntry(follower2Id, 1, persistedServerConfigEntry);
+
+        DefaultConfigParamsImpl follower2ConfigParams = newFollowerConfigParams();
+        follower2ConfigParams.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+        follower2Actor = newTestRaftActor(follower2Id, TestRaftActor.newBuilder().peerAddresses(
+                ImmutableMap.of(leaderId, testActorPath(leaderId), follower1Id, follower1Actor.path().toString())).
+                    config(follower2ConfigParams).persistent(Optional.of(false)));
+        TestRaftActor follower2Instance = follower2Actor.underlyingActor();
+        follower2Instance.waitForRecoveryComplete();
+        follower2CollectorActor = follower2Instance.collectorActor();
+
+        peerAddresses = ImmutableMap.of(follower1Id, follower1Actor.path().toString(),
+                follower2Id, follower2Actor.path().toString());
+
+        createNewLeaderActor();
+
+        currentTerm++;
+        assertEquals("Leader term", currentTerm, leaderContext.getTermInformation().getCurrentTerm());
+        assertEquals("Leader journal lastIndex", -1, leaderContext.getReplicatedLog().lastIndex());
+        assertEquals("Leader commit index", -1, leaderContext.getCommitIndex());
+
+        // Add new log entries to the leader - several more than the prior log entries
+
+        expSnapshotState.add(sendPayloadData(leaderActor, "zero-1"));
+        expSnapshotState.add(sendPayloadData(leaderActor, "one-1"));
+        expSnapshotState.add(sendPayloadData(leaderActor, "two-1"));
+        expSnapshotState.add(sendPayloadData(leaderActor, "three-1"));
+        expSnapshotState.add(sendPayloadData(leaderActor, "four-1"));
+
+        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, expSnapshotState.size());
+        MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, expSnapshotState.size());
+
+        lastIndex = 4;
+        assertEquals("Leader journal lastIndex", lastIndex, leaderContext.getReplicatedLog().lastIndex());
+        assertEquals("Leader commit index", lastIndex, leaderContext.getCommitIndex());
+        assertEquals("Leader snapshot index", -1, leaderContext.getReplicatedLog().getSnapshotIndex());
+        assertEquals("Leader replicatedToAllIndex", -1, leaderInstance.getCurrentBehavior().getReplicatedToAllIndex());
+
+        // Re-enable AppendEntries to the follower. The follower's log will be out of sync and it should
+        // should force the leader to install snapshot to re-sync the entire follower's log and state.
+
+        followerInstance.stopDropMessages(AppendEntries.class);
+        MessageCollectorActor.expectFirstMatching(follower1CollectorActor, SnapshotComplete.class);
+
+        assertEquals("Follower term", currentTerm, follower1Context.getTermInformation().getCurrentTerm());
+        assertEquals("Follower journal lastIndex", lastIndex, follower1Context.getReplicatedLog().lastIndex());
+        assertEquals("Follower journal lastTerm", currentTerm, follower1Context.getReplicatedLog().lastTerm());
+        assertEquals("Follower commit index", lastIndex, follower1Context.getCommitIndex());
+        assertEquals("Follower applied state", expSnapshotState, followerInstance.getState());
+
+        testLog.info("testFollowerResyncWithMoreLeaderLogEntriesAndDownPeerAfterNonPersistentLeaderRestart ending");
+    }
+
+    private void createNewLeaderActor() {
+        expSnapshotState.clear();
+        leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses).
+                config(leaderConfigParams).persistent(Optional.of(false)));
+        leaderInstance = leaderActor.underlyingActor();
+        leaderCollectorActor = leaderInstance.collectorActor();
+        waitUntilLeader(leaderActor);
+        leaderContext = leaderInstance.getRaftActorContext();
+    }
+
+    private void setupLeaderAndNonVotingFollower() {
+        snapshotBatchCount = 100;
+        int initialTerm = 1;
+
+        // Set up a persisted ServerConfigurationPayload with the leader voting and the follower non-voting.
+
+        ServerConfigurationPayload persistedServerConfig = new ServerConfigurationPayload(Arrays.asList(
+                new ServerInfo(leaderId, true), new ServerInfo(follower1Id, false)));
+        ReplicatedLogImplEntry persistedServerConfigEntry = new ReplicatedLogImplEntry(0, initialTerm,
+                persistedServerConfig);
+
+        InMemoryJournal.addEntry(leaderId, 1, new UpdateElectionTerm(initialTerm, leaderId));
+        InMemoryJournal.addEntry(leaderId, 2, persistedServerConfigEntry);
+        InMemoryJournal.addEntry(follower1Id, 1, new UpdateElectionTerm(initialTerm, leaderId));
+        InMemoryJournal.addEntry(follower1Id, 2, persistedServerConfigEntry);
+
+        DefaultConfigParamsImpl followerConfigParams = newFollowerConfigParams();
+        follower1Actor = newTestRaftActor(follower1Id, TestRaftActor.newBuilder().peerAddresses(
+                ImmutableMap.of(leaderId, testActorPath(leaderId))).config(followerConfigParams).
+                    persistent(Optional.of(false)));
+
+        peerAddresses = ImmutableMap.<String, String>builder().
+                put(follower1Id, follower1Actor.path().toString()).build();
+
+        leaderConfigParams = newLeaderConfigParams();
+        leaderActor = newTestRaftActor(leaderId, TestRaftActor.newBuilder().peerAddresses(peerAddresses).
+                config(leaderConfigParams).persistent(Optional.of(false)));
+
+        followerInstance = follower1Actor.underlyingActor();
+        follower1CollectorActor = followerInstance.collectorActor();
+
+        leaderInstance = leaderActor.underlyingActor();
+        leaderCollectorActor = leaderInstance.collectorActor();
+
+        leaderContext = leaderInstance.getRaftActorContext();
+        follower1Context = followerInstance.getRaftActorContext();
+
+        waitUntilLeader(leaderActor);
+
+        // Verify leader's context after startup
+
+        currentTerm = initialTerm + 1;
+        assertEquals("Leader term", currentTerm, leaderContext.getTermInformation().getCurrentTerm());
+        assertEquals("Leader server config", Sets.newHashSet(persistedServerConfig.getServerConfig()),
+                Sets.newHashSet(leaderContext.getPeerServerInfo(true).getServerConfig()));
+        assertEquals("Leader isVotingMember", true, leaderContext.isVotingMember());
+
+        // Verify follower's context after startup
+
+        MessageCollectorActor.expectFirstMatching(follower1CollectorActor, AppendEntries.class);
+        assertEquals("Follower term", currentTerm, follower1Context.getTermInformation().getCurrentTerm());
+        assertEquals("Follower server config", Sets.newHashSet(persistedServerConfig.getServerConfig()),
+                Sets.newHashSet(follower1Context.getPeerServerInfo(true).getServerConfig()));
+        assertEquals("FollowerisVotingMember", false, follower1Context.isVotingMember());
+    }
+}
index 1caf631..b9bd3cd 100644 (file)
@@ -1413,7 +1413,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         appendEntries = MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
 
         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
-        assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+        assertEquals("getPrevLogIndex", 0, appendEntries.getPrevLogIndex());
         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
 
         assertEquals("First entry index", 1, appendEntries.getEntries().get(0).getIndex());
@@ -1786,7 +1786,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         appendEntries = appendEntriesList.get(1);
         assertEquals("getLeaderCommit", leaderCommitIndex, appendEntries.getLeaderCommit());
-        assertEquals("getPrevLogIndex", -1, appendEntries.getPrevLogIndex());
+        assertEquals("getPrevLogIndex", 1, appendEntries.getPrevLogIndex());
         assertEquals("Log entries size", 2, appendEntries.getEntries().size());
 
         assertEquals("First entry index", 2, appendEntries.getEntries().get(0).getIndex());