Fix issue when AE leader differs from prior install snapshot leader 13/44813/3
authorTom Pantelis <tpanteli@brocade.com>
Mon, 29 Aug 2016 21:11:35 +0000 (17:11 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Tue, 30 Aug 2016 01:45:17 +0000 (01:45 +0000)
When a leader snapshot install is in progress, a follower doesn't process
AppendEntries and merely returns a reply with its last term/index. However,
if an install snapshot is initiated by a leader and there is more than one
chunk to send, it's possible for a leader change to occur prior to completing
sending all the chunks. When this happens, the new leader will begin sending
AppendEntries but the follower won't process them and make progress. We need
to clear the follower's snapshot state if the AppendEntries leaderId doesn't
match the prior install snapshot leaderId.

Change-Id: I4051bd064b6a20f4bcfe38b50656488fcb09274e
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java

index 73e2cf9..1a72133 100644 (file)
@@ -56,12 +56,8 @@ public class Follower extends AbstractRaftActorBehavior {
 
     private final SyncStatusTracker initialSyncStatusTracker;
 
-    private final Procedure<ReplicatedLogEntry> appendAndPersistCallback = new Procedure<ReplicatedLogEntry>() {
-        @Override
-        public void apply(ReplicatedLogEntry logEntry) {
-            context.getReplicatedLog().captureSnapshotIfReady(logEntry);
-        }
-    };
+    private final Procedure<ReplicatedLogEntry> appendAndPersistCallback =
+            logEntry -> context.getReplicatedLog().captureSnapshotIfReady(logEntry);
 
     private final Stopwatch lastLeaderMessageTimer = Stopwatch.createStarted();
     private SnapshotTracker snapshotTracker = null;
@@ -142,6 +138,12 @@ public class Follower extends AbstractRaftActorBehavior {
         // to make it easier to read. Before refactoring ensure tests
         // cover the code properly
 
+        if (snapshotTracker != null && !snapshotTracker.getLeaderId().equals(appendEntries.getLeaderId())) {
+            LOG.debug("{}: snapshot install is in progress but the prior snapshot leaderId {} does not match the " +
+                    "AppendEntries leaderId {}", logName(), snapshotTracker.getLeaderId(), appendEntries.getLeaderId());
+            snapshotTracker = null;
+        }
+
         if (snapshotTracker != null || context.getSnapshotManager().isApplying()) {
             // if snapshot install is in progress, follower should just acknowledge append entries with a reply.
             AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true,
@@ -500,7 +502,7 @@ public class Follower extends AbstractRaftActorBehavior {
         leaderId = installSnapshot.getLeaderId();
 
         if(snapshotTracker == null){
-            snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks());
+            snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks(), installSnapshot.getLeaderId());
         }
 
         updateInitialSyncStatus(installSnapshot.getLastIncludedIndex(), installSnapshot.getLeaderId());
index bb7a242..9249142 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.cluster.raft.behaviors;
 
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
 import java.util.Arrays;
 import org.slf4j.Logger;
@@ -19,14 +20,16 @@ import org.slf4j.Logger;
 public class SnapshotTracker {
     private final Logger LOG;
     private final int totalChunks;
+    private final String leaderId;
     private ByteString collectedChunks = ByteString.EMPTY;
     private int lastChunkIndex = AbstractLeader.FIRST_CHUNK_INDEX - 1;
     private boolean sealed = false;
     private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE;
 
-    SnapshotTracker(Logger LOG, int totalChunks){
+    SnapshotTracker(Logger LOG, int totalChunks, String leaderId) {
         this.LOG = LOG;
         this.totalChunks = totalChunks;
+        this.leaderId = Preconditions.checkNotNull(leaderId);
     }
 
     /**
@@ -56,7 +59,7 @@ public class SnapshotTracker {
             }
         }
 
-        sealed = (chunkIndex == totalChunks);
+        sealed = chunkIndex == totalChunks;
         lastChunkIndex = chunkIndex;
         collectedChunks = collectedChunks.concat(ByteString.copyFrom(chunk));
         this.lastChunkHashCode = Arrays.hashCode(chunk);
@@ -75,6 +78,10 @@ public class SnapshotTracker {
         return collectedChunks;
     }
 
+    String getLeaderId() {
+        return leaderId;
+    }
+
     public static class InvalidChunkException extends Exception {
         private static final long serialVersionUID = 1L;
 
index df468cf..b01fd33 100644 (file)
@@ -15,8 +15,6 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
@@ -792,7 +790,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         int offset = 0;
         int snapshotLength = bsSnapshot.size();
         int chunkSize = 50;
-        int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
+        int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
         int lastIncludedIndex = 1;
         int chunkIndex = 1;
         InstallSnapshot lastInstallSnapshot = null;
@@ -855,7 +853,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         ByteString bsSnapshot  = createSnapshot();
         int snapshotLength = bsSnapshot.size();
         int chunkSize = 50;
-        int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
+        int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
         int lastIncludedIndex = 1;
 
         // Check that snapshot installation is not in progress
@@ -873,19 +871,61 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         assertNotNull(follower.getSnapshotTracker());
 
         // Send an append entry
-        AppendEntries appendEntries = mock(AppendEntries.class);
-        doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm();
+        AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1,
+                Arrays.asList(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1);
 
         follower.handleMessage(leaderActor, appendEntries);
 
         AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
-        assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
-        assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
-        assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm());
+        assertEquals("isSuccess", true, reply.isSuccess());
+        assertEquals("getLogLastIndex", context.getReplicatedLog().lastIndex(), reply.getLogLastIndex());
+        assertEquals("getLogLastTerm", context.getReplicatedLog().lastTerm(), reply.getLogLastTerm());
+        assertEquals("getTerm", context.getTermInformation().getCurrentTerm(), reply.getTerm());
+
+        assertNotNull(follower.getSnapshotTracker());
+    }
+
+    @Test
+    public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() throws Exception {
+        logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader");
+
+        MockRaftActorContext context = createActorContext();
+
+        follower = createBehavior(context);
 
-        // We should not hit the code that needs to look at prevLogIndex because we are short circuiting
-        verify(appendEntries, never()).getPrevLogIndex();
+        ByteString bsSnapshot  = createSnapshot();
+        int snapshotLength = bsSnapshot.size();
+        int chunkSize = 50;
+        int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
+        int lastIncludedIndex = 1;
 
+        // Check that snapshot installation is not in progress
+        assertNull(follower.getSnapshotTracker());
+
+        // Make sure that we have more than 1 chunk to send
+        assertTrue(totalChunks > 1);
+
+        // Send an install snapshot with the first chunk to start the process of installing a snapshot
+        byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize);
+        follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1,
+                chunkData, 1, totalChunks));
+
+        // Check if snapshot installation is in progress now
+        assertNotNull(follower.getSnapshotTracker());
+
+        // Send appendEntries with a new term and leader.
+        AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1,
+                Arrays.asList(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1);
+
+        follower.handleMessage(leaderActor, appendEntries);
+
+        AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class);
+        assertEquals("isSuccess", true, reply.isSuccess());
+        assertEquals("getLogLastIndex", 2, reply.getLogLastIndex());
+        assertEquals("getLogLastTerm", 2, reply.getLogLastTerm());
+        assertEquals("getTerm", 2, reply.getTerm());
+
+        assertNull(follower.getSnapshotTracker());
     }
 
     @Test
@@ -901,7 +941,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         int offset = 0;
         int snapshotLength = bsSnapshot.size();
         int chunkSize = 50;
-        int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0);
+        int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0);
         int lastIncludedIndex = 1;
         int chunkIndex = 1;
         InstallSnapshot lastInstallSnapshot = null;
@@ -1050,7 +1090,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest<Follower> {
         if (chunkSize > snapshotLength) {
             size = snapshotLength;
         } else {
-            if ((start + chunkSize) > snapshotLength) {
+            if (start + chunkSize > snapshotLength) {
                 size = snapshotLength - start;
             }
         }
index c7d98b0..c1bc215 100644 (file)
@@ -48,14 +48,14 @@ public class SnapshotTrackerTest {
 
     @Test
     public void testAddChunk() throws SnapshotTracker.InvalidChunkException {
-        SnapshotTracker tracker1 = new SnapshotTracker(logger, 5);
+        SnapshotTracker tracker1 = new SnapshotTracker(logger, 5, "leader");
 
         tracker1.addChunk(1, chunk1, Optional.<Integer>absent());
         tracker1.addChunk(2, chunk2, Optional.<Integer>absent());
         tracker1.addChunk(3, chunk3, Optional.<Integer>absent());
 
         // Verify that an InvalidChunkException is thrown when we try to add a chunk to a sealed tracker
-        SnapshotTracker tracker2 = new SnapshotTracker(logger, 2);
+        SnapshotTracker tracker2 = new SnapshotTracker(logger, 2, "leader");
 
         tracker2.addChunk(1, chunk1, Optional.<Integer>absent());
         tracker2.addChunk(2, chunk2, Optional.<Integer>absent());
@@ -68,7 +68,7 @@ public class SnapshotTrackerTest {
         }
 
         // The first chunk's index must at least be FIRST_CHUNK_INDEX
-        SnapshotTracker tracker3 = new SnapshotTracker(logger, 2);
+        SnapshotTracker tracker3 = new SnapshotTracker(logger, 2, "leader");
 
         try {
             tracker3.addChunk(AbstractLeader.FIRST_CHUNK_INDEX - 1, chunk1, Optional.<Integer>absent());
@@ -78,7 +78,7 @@ public class SnapshotTrackerTest {
         }
 
         // Out of sequence chunk indexes won't work
-        SnapshotTracker tracker4 = new SnapshotTracker(logger, 2);
+        SnapshotTracker tracker4 = new SnapshotTracker(logger, 2, "leader");
 
         tracker4.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.<Integer>absent());
 
@@ -91,7 +91,7 @@ public class SnapshotTrackerTest {
 
         // No exceptions will be thrown when invalid chunk is added with the right sequence
         // If the lastChunkHashCode is missing
-        SnapshotTracker tracker5 = new SnapshotTracker(logger, 2);
+        SnapshotTracker tracker5 = new SnapshotTracker(logger, 2, "leader");
 
         tracker5.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.<Integer>absent());
         // Look I can add the same chunk again
@@ -99,7 +99,7 @@ public class SnapshotTrackerTest {
 
         // An exception will be thrown when an invalid chunk is addedd with the right sequence
         // when the lastChunkHashCode is present
-        SnapshotTracker tracker6 = new SnapshotTracker(logger, 2);
+        SnapshotTracker tracker6 = new SnapshotTracker(logger, 2, "leader");
 
         tracker6.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.of(-1));
 
@@ -117,7 +117,7 @@ public class SnapshotTrackerTest {
     public void testGetSnapShot() throws SnapshotTracker.InvalidChunkException {
 
         // Trying to get a snapshot before all chunks have been received will throw an exception
-        SnapshotTracker tracker1 = new SnapshotTracker(logger, 5);
+        SnapshotTracker tracker1 = new SnapshotTracker(logger, 5, "leader");
 
         tracker1.addChunk(1, chunk1, Optional.<Integer>absent());
         try {
@@ -127,7 +127,7 @@ public class SnapshotTrackerTest {
 
         }
 
-        SnapshotTracker tracker2 = new SnapshotTracker(logger, 3);
+        SnapshotTracker tracker2 = new SnapshotTracker(logger, 3, "leader");
 
         tracker2.addChunk(1, chunk1, Optional.of(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE));
         tracker2.addChunk(2, chunk2, Optional.of(Arrays.hashCode(chunk1)));
@@ -140,7 +140,7 @@ public class SnapshotTrackerTest {
 
     @Test
     public void testGetCollectedChunks() throws SnapshotTracker.InvalidChunkException {
-        SnapshotTracker tracker1 = new SnapshotTracker(logger, 5);
+        SnapshotTracker tracker1 = new SnapshotTracker(logger, 5, "leader");
 
         ByteString chunks = ByteString.copyFrom(chunk1).concat(ByteString.copyFrom(chunk2));
 
@@ -156,7 +156,7 @@ public class SnapshotTrackerTest {
         if (size > snapshotLength) {
             size = snapshotLength;
         } else {
-            if ((start + size) > snapshotLength) {
+            if (start + size > snapshotLength) {
                 size = snapshotLength - start;
             }
         }
@@ -192,4 +192,4 @@ public class SnapshotTrackerTest {
     }
 
 
-}
\ No newline at end of file
+}

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.