BUG 2185 : Follower should request forceInstallSnapshot in out-of-sync scenario 11/26411/2
authorMoiz Raja <moraja@cisco.com>
Tue, 4 Aug 2015 00:05:14 +0000 (17:05 -0700)
committerTom Pantelis <tpanteli@brocade.com>
Thu, 10 Sep 2015 14:40:23 +0000 (10:40 -0400)
When the Follower detects that it has more entries in it's log than the Leader
it might be an indication that the Follower was previously a Leader and therefore
it has additional entries in it's log which are missing in the Leader. When the
RaftPolicy is set to allow commits before consensus this could also mean that the
state now has more data than should be present in there. In this scenario Follower
requests the Leader to InstallSnapshot.

Change-Id: I517af148c3933f798ceb87ff88c77c396590881f
Signed-off-by: Moiz Raja <moraja@cisco.com>
(cherry picked from commit 8c3896a99a221d3411f88f3a53c7d9fe6b89734e)

opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java

index 85f2f153ab3046d6541225c95d3863be3435b6b0..578989599752f41c0fefd635c879effbc187c357 100644 (file)
@@ -196,7 +196,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
             long followerLastLogIndex = appendEntriesReply.getLogLastIndex();
             ReplicatedLogEntry followersLastLogEntry = context.getReplicatedLog().get(followerLastLogIndex);
-            if(followerLastLogIndex < 0 || (followersLastLogEntry != null &&
+            if(appendEntriesReply.isForceInstallSnapshot()) {
+                // Reset the followers match and next index. This is to signal that this follower has nothing
+                // in common with this Leader and so would require a snapshot to be installed
+                followerLogInformation.setMatchIndex(-1);
+                followerLogInformation.setNextIndex(-1);
+
+                // Force initiate a snapshot capture
+                initiateCaptureSnapshot(followerId);
+            } else if(followerLastLogIndex < 0 || (followersLastLogEntry != null &&
                     followersLastLogEntry.getTerm() == appendEntriesReply.getLogLastTerm())) {
                 // The follower's log is empty or the last entry is present in the leader's journal
                 // and the terms match so the follower is just behind the leader's journal from
@@ -539,7 +547,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
                     // Send heartbeat to follower whenever install snapshot is initiated.
                     sendAppendEntries = true;
-                    initiateCaptureSnapshot(followerId, followerNextIndex);
+                    if (canInstallSnapshot(followerNextIndex)) {
+                        initiateCaptureSnapshot(followerId);
+                    }
 
                 } else if(sendHeartbeat) {
                     // we send an AppendEntries, even if the follower is inactive
@@ -583,27 +593,32 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      * 6. If another follower requires a snapshot and a snapshot has been collected (via CaptureSnapshotReply)
      * then send the existing snapshot in chunks to the follower.
      * @param followerId
-     * @param followerNextIndex
      */
-    private void initiateCaptureSnapshot(String followerId, long followerNextIndex) {
-        if (!context.getReplicatedLog().isPresent(followerNextIndex) &&
-                context.getReplicatedLog().isInSnapshot(followerNextIndex)) {
-
-            if (snapshot.isPresent()) {
-                // if a snapshot is present in the memory, most likely another install is in progress
-                // no need to capture snapshot.
-                // This could happen if another follower needs an install when one is going on.
-                final ActorSelection followerActor = context.getPeerActorSelection(followerId);
-                sendSnapshotChunk(followerActor, followerId);
+    private void initiateCaptureSnapshot(String followerId) {
+        if (snapshot.isPresent()) {
+            // if a snapshot is present in the memory, most likely another install is in progress
+            // no need to capture snapshot.
+            // This could happen if another follower needs an install when one is going on.
+            final ActorSelection followerActor = context.getPeerActorSelection(followerId);
+            sendSnapshotChunk(followerActor, followerId);
 
 
-            } else {
-                context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
-                        this.getReplicatedToAllIndex(), followerId);
-            }
+        } else {
+            context.getSnapshotManager().captureToInstall(context.getReplicatedLog().last(),
+                    this.getReplicatedToAllIndex(), followerId);
         }
     }
 
+    private boolean canInstallSnapshot(long nextIndex){
+        // If the follower's nextIndex is -1 then we might as well send it a snapshot
+        // Otherwise send it a snapshot only if the nextIndex is not present in the log but is present
+        // in the snapshot
+        return (nextIndex == -1 ||
+                (!context.getReplicatedLog().isPresent(nextIndex)
+                        && context.getReplicatedLog().isInSnapshot(nextIndex)));
+
+    }
+
 
     private void sendInstallSnapshot() {
         LOG.debug("{}: sendInstallSnapshot", logName());
@@ -612,9 +627,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
             if (followerActor != null) {
                 long nextIndex = e.getValue().getNextIndex();
-
-                if (!context.getReplicatedLog().isPresent(nextIndex) &&
-                    context.getReplicatedLog().isInSnapshot(nextIndex)) {
+                if (canInstallSnapshot(nextIndex)) {
                     sendSnapshotChunk(followerActor, e.getKey());
                 }
             }
index 0cd6fbab52c61343ef1ef93a6a49ffba7efa9b06..974ec475856af94f6e36ad367b7cf555d71354bd 100644 (file)
@@ -147,7 +147,7 @@ public class Follower extends AbstractRaftActorBehavior {
             int addEntriesFrom = 0;
             if (context.getReplicatedLog().size() > 0) {
 
-                // Find the entry up until which the one that is not in the follower's log
+                // Find the entry up until the one that is not in the follower's log
                 for (int i = 0;i < appendEntries.getEntries().size(); i++, addEntriesFrom++) {
                     ReplicatedLogEntry matchEntry = appendEntries.getEntries().get(i);
                     ReplicatedLogEntry newEntry = context.getReplicatedLog().get(matchEntry.getIndex());
@@ -161,12 +161,19 @@ public class Follower extends AbstractRaftActorBehavior {
                         continue;
                     }
 
-                    LOG.debug("{}: Removing entries from log starting at {}", logName(),
+                    if(!context.getRaftPolicy().applyModificationToStateBeforeConsensus()) {
+
+                        LOG.debug("{}: Removing entries from log starting at {}", logName(),
                                 matchEntry.getIndex());
 
-                    // Entries do not match so remove all subsequent entries
-                    context.getReplicatedLog().removeFromAndPersist(matchEntry.getIndex());
-                    break;
+                        // Entries do not match so remove all subsequent entries
+                        context.getReplicatedLog().removeFromAndPersist(matchEntry.getIndex());
+                        break;
+                    } else {
+                        sender.tell(new AppendEntriesReply(context.getId(), currentTerm(), false, lastIndex,
+                                lastTerm(), context.getPayloadVersion(), true), actor());
+                        return this;
+                    }
                 }
             }
 
index 990605b288201c4d0d3156c65c8077d326d25cdf..521a4512c7e49beef57ca56d2ac9f41e0d10a914 100644 (file)
@@ -31,8 +31,15 @@ public class AppendEntriesReply extends AbstractRaftRPC {
 
     private final short payloadVersion;
 
+    private final boolean forceInstallSnapshot;
+
     public AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm,
             short payloadVersion) {
+        this(followerId, term, success, logLastIndex, logLastTerm, payloadVersion, false);
+    }
+
+    public AppendEntriesReply(String followerId, long term, boolean success, long logLastIndex, long logLastTerm,
+                              short payloadVersion, boolean forceInstallSnapshot) {
         super(term);
 
         this.followerId = followerId;
@@ -40,8 +47,10 @@ public class AppendEntriesReply extends AbstractRaftRPC {
         this.logLastIndex = logLastIndex;
         this.logLastTerm = logLastTerm;
         this.payloadVersion = payloadVersion;
+        this.forceInstallSnapshot = forceInstallSnapshot;
     }
 
+
     @Override
     public long getTerm() {
         return term;
@@ -72,7 +81,12 @@ public class AppendEntriesReply extends AbstractRaftRPC {
         StringBuilder builder = new StringBuilder();
         builder.append("AppendEntriesReply [success=").append(success).append(", logLastIndex=").append(logLastIndex)
                 .append(", logLastTerm=").append(logLastTerm).append(", followerId=").append(followerId)
-                .append(", payloadVersion=").append(payloadVersion).append("]");
+                .append(", payloadVersion=").append(", forceInstallSnapshot=").append(forceInstallSnapshot)
+                .append(payloadVersion).append("]");
         return builder.toString();
     }
+
+    public boolean isForceInstallSnapshot() {
+        return forceInstallSnapshot;
+    }
 }
index 42e35d576731baf6484624743cec160f197d7c59..f189e2d2ee6d140cbc738940540fb66adde82656 100644 (file)
@@ -599,6 +599,44 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         expectAndVerifyAppendEntriesReply(2, true, context.getId(), 2, 3);
     }
 
+    @Test
+    public void testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot() {
+        logStart("testHandleAppendEntriesWhenOutOfSyncLogDetectedRequestForceInstallSnapshot");
+
+        MockRaftActorContext context = createActorContext();
+
+        // First set the receivers term to lower number
+        context.getTermInformation().update(1, "test");
+
+        // Prepare the receivers log
+        MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
+        log.append(newReplicatedLogEntry(1, 0, "zero"));
+        log.append(newReplicatedLogEntry(1, 1, "one"));
+        log.append(newReplicatedLogEntry(1, 2, "two"));
+
+        context.setReplicatedLog(log);
+
+        // Prepare the entries to be sent with AppendEntries
+        List<ReplicatedLogEntry> entries = new ArrayList<>();
+        entries.add(newReplicatedLogEntry(2, 2, "two-1"));
+        entries.add(newReplicatedLogEntry(2, 3, "three"));
+
+        // Send appendEntries with the same term as was set on the receiver
+        // before the new behavior was created (1 in this case)
+        // This will not work for a Candidate because as soon as a Candidate
+        // is created it increments the term
+        AppendEntries appendEntries = new AppendEntries(2, "leader", 1, 1, entries, 3, -1, (short)0);
+
+        context.setRaftPolicy(createRaftPolicy(false, true));
+        follower = createBehavior(context);
+
+        RaftActorBehavior newBehavior = follower.handleMessage(leaderActor, appendEntries);
+
+        Assert.assertSame(follower, newBehavior);
+
+        expectAndVerifyAppendEntriesReply(2, false, context.getId(), 1, 2, true);
+    }
+
     @Test
     public void testHandleAppendEntriesPreviousLogEntryMissing(){
         logStart("testHandleAppendEntriesPreviousLogEntryMissing");
@@ -957,6 +995,12 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
 
     private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
             String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
+        expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
+    }
+
+    private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
+                                                   String expFollowerId, long expLogLastTerm, long expLogLastIndex,
+                                                   boolean expForceInstallSnapshot) {
 
         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
                 AppendEntriesReply.class);
@@ -967,8 +1011,10 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         assertEquals("getLogLastTerm", expLogLastTerm, reply.getLogLastTerm());
         assertEquals("getLogLastIndex", expLogLastIndex, reply.getLogLastIndex());
         assertEquals("getPayloadVersion", payloadVersion, reply.getPayloadVersion());
+        assertEquals("isForceInstallSnapshot", expForceInstallSnapshot, reply.isForceInstallSnapshot());
     }
 
+
     private ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
         return new MockRaftActorContext.MockReplicatedLogEntry(term, index,
                 new MockRaftActorContext.MockPayload(data));
index 9b877a7cdc2b0b3d804a9a31697e7a1f044f7f3e..a391fdd342cdef3c85320c5a3bd8f9f687066393 100644 (file)
@@ -601,11 +601,6 @@ public class LeaderTest extends AbstractLeaderTest {
 
         MockRaftActorContext actorContext = createActorContextWithFollower();
 
-        Map<String, String> leadersSnapshot = new HashMap<>();
-        leadersSnapshot.put("1", "A");
-        leadersSnapshot.put("2", "B");
-        leadersSnapshot.put("3", "C");
-
         //clears leaders log
         actorContext.getReplicatedLog().removeFrom(0);
 
@@ -656,6 +651,69 @@ public class LeaderTest extends AbstractLeaderTest {
         Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
     }
 
+    @Test
+    public void testInitiateForceInstallSnapshot() throws Exception {
+        logStart("testInitiateForceInstallSnapshot");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+
+        final int followersLastIndex = 2;
+        final int snapshotIndex = -1;
+        final int newEntryIndex = 4;
+        final int snapshotTerm = -1;
+        final int currentTerm = 2;
+
+        // set the snapshot variables in replicatedlog
+        actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+        actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+        actorContext.setLastApplied(3);
+        actorContext.setCommitIndex(followersLastIndex);
+
+        actorContext.getReplicatedLog().removeFrom(0);
+
+        leader = new Leader(actorContext);
+
+        // Leader will send an immediate heartbeat - ignore it.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        // set the snapshot as absent and check if capture-snapshot is invoked.
+        leader.setSnapshot(null);
+
+        for(int i=0;i<4;i++) {
+            actorContext.getReplicatedLog().append(new ReplicatedLogImplEntry(i, 1,
+                    new MockRaftActorContext.MockPayload("X" + i)));
+        }
+
+        // new entry
+        ReplicatedLogImplEntry entry = new ReplicatedLogImplEntry(newEntryIndex, currentTerm,
+                new MockRaftActorContext.MockPayload("D"));
+
+        actorContext.getReplicatedLog().append(entry);
+
+        //update follower timestamp
+        leader.markFollowerActive(FOLLOWER_ID);
+
+        // Sending this AppendEntriesReply forces the Leader to capture a snapshot, which subsequently gets
+        // installed with a SendInstallSnapshot
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 100, 1, (short) 1, true));
+
+        assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
+
+        CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
+
+        assertTrue(cs.isInstallSnapshotInitiated());
+        assertEquals(3, cs.getLastAppliedIndex());
+        assertEquals(1, cs.getLastAppliedTerm());
+        assertEquals(4, cs.getLastIndex());
+        assertEquals(2, cs.getLastTerm());
+
+        // if an initiate is started again when first is in progress, it shouldnt initiate Capture
+        leader.handleMessage(leaderActor, new Replicate(null, "state-id", entry));
+
+        Assert.assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
+    }
+
+
     @Test
     public void testInstallSnapshot() throws Exception {
         logStart("testInstallSnapshot");
@@ -709,6 +767,56 @@ public class LeaderTest extends AbstractLeaderTest {
         assertEquals(currentTerm, installSnapshot.getTerm());
     }
 
+    @Test
+    public void testForceInstallSnapshot() throws Exception {
+        logStart("testForceInstallSnapshot");
+
+        MockRaftActorContext actorContext = createActorContextWithFollower();
+
+        Map<String, String> leadersSnapshot = new HashMap<>();
+        leadersSnapshot.put("1", "A");
+        leadersSnapshot.put("2", "B");
+        leadersSnapshot.put("3", "C");
+
+        final int lastAppliedIndex = 3;
+        final int snapshotIndex = -1;
+        final int snapshotTerm = -1;
+        final int currentTerm = 2;
+
+        // set the snapshot variables in replicatedlog
+        actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+        actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+        actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+        actorContext.setCommitIndex(lastAppliedIndex);
+        actorContext.setLastApplied(lastAppliedIndex);
+
+        leader = new Leader(actorContext);
+
+        // Initial heartbeat.
+        MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
+
+        leader.getFollower(FOLLOWER_ID).setMatchIndex(-1);
+        leader.getFollower(FOLLOWER_ID).setNextIndex(-1);
+
+        Snapshot snapshot = Snapshot.create(toByteString(leadersSnapshot).toByteArray(),
+                Collections.<ReplicatedLogEntry>emptyList(),
+                lastAppliedIndex, snapshotTerm, lastAppliedIndex, snapshotTerm);
+
+        RaftActorBehavior raftBehavior = leader.handleMessage(leaderActor, new SendInstallSnapshot(snapshot));
+
+        assertTrue(raftBehavior instanceof Leader);
+
+        // check if installsnapshot gets called with the correct values.
+
+        InstallSnapshot installSnapshot = MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
+
+        assertNotNull(installSnapshot.getData());
+        assertEquals(lastAppliedIndex, installSnapshot.getLastIncludedIndex());
+        assertEquals(snapshotTerm, installSnapshot.getLastIncludedTerm());
+
+        assertEquals(currentTerm, installSnapshot.getTerm());
+    }
+
     @Test
     public void testHandleInstallSnapshotReplyLastChunk() throws Exception {
         logStart("testHandleInstallSnapshotReplyLastChunk");