From: Tom Pantelis Date: Mon, 29 Aug 2016 21:11:35 +0000 (-0400) Subject: Fix issue when AE leader differs from prior install snapshot leader X-Git-Tag: release/carbon~501 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=95d7b8820236d16cb7e37c4a95fcae6f6d55581e Fix issue when AE leader differs from prior install snapshot leader When a leader snapshot install is in progress, a follower doesn't process AppendEntries and merely returns a reply with its last term/index. However, if an install snapshot is initiated by a leader and there is more than one chunk to send, it's possible for a leader change to occur prior to completing sending all the chunks. When this happens, the new leader will begin sending AppendEntries but the follower won't process them and make progress. We need to clear the follower's snapshot state if the AppendEntries leaderId doesn't match the prior install snapshot leaderId. Change-Id: I4051bd064b6a20f4bcfe38b50656488fcb09274e Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 73e2cf9bc5..1a72133506 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -56,12 +56,8 @@ public class Follower extends AbstractRaftActorBehavior { private final SyncStatusTracker initialSyncStatusTracker; - private final Procedure appendAndPersistCallback = new Procedure() { - @Override - public void apply(ReplicatedLogEntry logEntry) { - context.getReplicatedLog().captureSnapshotIfReady(logEntry); - } - }; + private final Procedure appendAndPersistCallback = + logEntry -> context.getReplicatedLog().captureSnapshotIfReady(logEntry); private final Stopwatch lastLeaderMessageTimer = Stopwatch.createStarted(); private SnapshotTracker snapshotTracker = null; @@ -142,6 +138,12 @@ public class Follower extends AbstractRaftActorBehavior { // to make it easier to read. Before refactoring ensure tests // cover the code properly + if (snapshotTracker != null && !snapshotTracker.getLeaderId().equals(appendEntries.getLeaderId())) { + LOG.debug("{}: snapshot install is in progress but the prior snapshot leaderId {} does not match the " + + "AppendEntries leaderId {}", logName(), snapshotTracker.getLeaderId(), appendEntries.getLeaderId()); + snapshotTracker = null; + } + if (snapshotTracker != null || context.getSnapshotManager().isApplying()) { // if snapshot install is in progress, follower should just acknowledge append entries with a reply. AppendEntriesReply reply = new AppendEntriesReply(context.getId(), currentTerm(), true, @@ -500,7 +502,7 @@ public class Follower extends AbstractRaftActorBehavior { leaderId = installSnapshot.getLeaderId(); if(snapshotTracker == null){ - snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks()); + snapshotTracker = new SnapshotTracker(LOG, installSnapshot.getTotalChunks(), installSnapshot.getLeaderId()); } updateInitialSyncStatus(installSnapshot.getLastIncludedIndex(), installSnapshot.getLeaderId()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java index bb7a242481..9249142874 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.raft.behaviors; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import java.util.Arrays; import org.slf4j.Logger; @@ -19,14 +20,16 @@ import org.slf4j.Logger; public class SnapshotTracker { private final Logger LOG; private final int totalChunks; + private final String leaderId; private ByteString collectedChunks = ByteString.EMPTY; private int lastChunkIndex = AbstractLeader.FIRST_CHUNK_INDEX - 1; private boolean sealed = false; private int lastChunkHashCode = AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE; - SnapshotTracker(Logger LOG, int totalChunks){ + SnapshotTracker(Logger LOG, int totalChunks, String leaderId) { this.LOG = LOG; this.totalChunks = totalChunks; + this.leaderId = Preconditions.checkNotNull(leaderId); } /** @@ -56,7 +59,7 @@ public class SnapshotTracker { } } - sealed = (chunkIndex == totalChunks); + sealed = chunkIndex == totalChunks; lastChunkIndex = chunkIndex; collectedChunks = collectedChunks.concat(ByteString.copyFrom(chunk)); this.lastChunkHashCode = Arrays.hashCode(chunk); @@ -75,6 +78,10 @@ public class SnapshotTracker { return collectedChunks; } + String getLeaderId() { + return leaderId; + } + public static class InvalidChunkException extends Exception { private static final long serialVersionUID = 1L; diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java index df468cf24c..b01fd33914 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java @@ -15,8 +15,6 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -792,7 +790,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { int offset = 0; int snapshotLength = bsSnapshot.size(); int chunkSize = 50; - int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0); + int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0); int lastIncludedIndex = 1; int chunkIndex = 1; InstallSnapshot lastInstallSnapshot = null; @@ -855,7 +853,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { ByteString bsSnapshot = createSnapshot(); int snapshotLength = bsSnapshot.size(); int chunkSize = 50; - int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0); + int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0); int lastIncludedIndex = 1; // Check that snapshot installation is not in progress @@ -873,19 +871,61 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { assertNotNull(follower.getSnapshotTracker()); // Send an append entry - AppendEntries appendEntries = mock(AppendEntries.class); - doReturn(context.getTermInformation().getCurrentTerm()).when(appendEntries).getTerm(); + AppendEntries appendEntries = new AppendEntries(1, "leader", 1, 1, + Arrays.asList(newReplicatedLogEntry(2, 1, "3")), 2, -1, (short)1); follower.handleMessage(leaderActor, appendEntries); AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); - assertEquals(context.getReplicatedLog().lastIndex(), reply.getLogLastIndex()); - assertEquals(context.getReplicatedLog().lastTerm(), reply.getLogLastTerm()); - assertEquals(context.getTermInformation().getCurrentTerm(), reply.getTerm()); + assertEquals("isSuccess", true, reply.isSuccess()); + assertEquals("getLogLastIndex", context.getReplicatedLog().lastIndex(), reply.getLogLastIndex()); + assertEquals("getLogLastTerm", context.getReplicatedLog().lastTerm(), reply.getLogLastTerm()); + assertEquals("getTerm", context.getTermInformation().getCurrentTerm(), reply.getTerm()); + + assertNotNull(follower.getSnapshotTracker()); + } + + @Test + public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() throws Exception { + logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader"); + + MockRaftActorContext context = createActorContext(); + + follower = createBehavior(context); - // We should not hit the code that needs to look at prevLogIndex because we are short circuiting - verify(appendEntries, never()).getPrevLogIndex(); + ByteString bsSnapshot = createSnapshot(); + int snapshotLength = bsSnapshot.size(); + int chunkSize = 50; + int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0); + int lastIncludedIndex = 1; + // Check that snapshot installation is not in progress + assertNull(follower.getSnapshotTracker()); + + // Make sure that we have more than 1 chunk to send + assertTrue(totalChunks > 1); + + // Send an install snapshot with the first chunk to start the process of installing a snapshot + byte[] chunkData = getNextChunk(bsSnapshot, 0, chunkSize); + follower.handleMessage(leaderActor, new InstallSnapshot(1, "leader", lastIncludedIndex, 1, + chunkData, 1, totalChunks)); + + // Check if snapshot installation is in progress now + assertNotNull(follower.getSnapshotTracker()); + + // Send appendEntries with a new term and leader. + AppendEntries appendEntries = new AppendEntries(2, "new-leader", 1, 1, + Arrays.asList(newReplicatedLogEntry(2, 2, "3")), 2, -1, (short)1); + + follower.handleMessage(leaderActor, appendEntries); + + AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor, AppendEntriesReply.class); + assertEquals("isSuccess", true, reply.isSuccess()); + assertEquals("getLogLastIndex", 2, reply.getLogLastIndex()); + assertEquals("getLogLastTerm", 2, reply.getLogLastTerm()); + assertEquals("getTerm", 2, reply.getTerm()); + + assertNull(follower.getSnapshotTracker()); } @Test @@ -901,7 +941,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { int offset = 0; int snapshotLength = bsSnapshot.size(); int chunkSize = 50; - int totalChunks = (snapshotLength / chunkSize) + ((snapshotLength % chunkSize) > 0 ? 1 : 0); + int totalChunks = snapshotLength / chunkSize + (snapshotLength % chunkSize > 0 ? 1 : 0); int lastIncludedIndex = 1; int chunkIndex = 1; InstallSnapshot lastInstallSnapshot = null; @@ -1050,7 +1090,7 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest { if (chunkSize > snapshotLength) { size = snapshotLength; } else { - if ((start + chunkSize) > snapshotLength) { + if (start + chunkSize > snapshotLength) { size = snapshotLength - start; } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java index c7d98b083c..c1bc215b6b 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java @@ -48,14 +48,14 @@ public class SnapshotTrackerTest { @Test public void testAddChunk() throws SnapshotTracker.InvalidChunkException { - SnapshotTracker tracker1 = new SnapshotTracker(logger, 5); + SnapshotTracker tracker1 = new SnapshotTracker(logger, 5, "leader"); tracker1.addChunk(1, chunk1, Optional.absent()); tracker1.addChunk(2, chunk2, Optional.absent()); tracker1.addChunk(3, chunk3, Optional.absent()); // Verify that an InvalidChunkException is thrown when we try to add a chunk to a sealed tracker - SnapshotTracker tracker2 = new SnapshotTracker(logger, 2); + SnapshotTracker tracker2 = new SnapshotTracker(logger, 2, "leader"); tracker2.addChunk(1, chunk1, Optional.absent()); tracker2.addChunk(2, chunk2, Optional.absent()); @@ -68,7 +68,7 @@ public class SnapshotTrackerTest { } // The first chunk's index must at least be FIRST_CHUNK_INDEX - SnapshotTracker tracker3 = new SnapshotTracker(logger, 2); + SnapshotTracker tracker3 = new SnapshotTracker(logger, 2, "leader"); try { tracker3.addChunk(AbstractLeader.FIRST_CHUNK_INDEX - 1, chunk1, Optional.absent()); @@ -78,7 +78,7 @@ public class SnapshotTrackerTest { } // Out of sequence chunk indexes won't work - SnapshotTracker tracker4 = new SnapshotTracker(logger, 2); + SnapshotTracker tracker4 = new SnapshotTracker(logger, 2, "leader"); tracker4.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.absent()); @@ -91,7 +91,7 @@ public class SnapshotTrackerTest { // No exceptions will be thrown when invalid chunk is added with the right sequence // If the lastChunkHashCode is missing - SnapshotTracker tracker5 = new SnapshotTracker(logger, 2); + SnapshotTracker tracker5 = new SnapshotTracker(logger, 2, "leader"); tracker5.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.absent()); // Look I can add the same chunk again @@ -99,7 +99,7 @@ public class SnapshotTrackerTest { // An exception will be thrown when an invalid chunk is addedd with the right sequence // when the lastChunkHashCode is present - SnapshotTracker tracker6 = new SnapshotTracker(logger, 2); + SnapshotTracker tracker6 = new SnapshotTracker(logger, 2, "leader"); tracker6.addChunk(AbstractLeader.FIRST_CHUNK_INDEX, chunk1, Optional.of(-1)); @@ -117,7 +117,7 @@ public class SnapshotTrackerTest { public void testGetSnapShot() throws SnapshotTracker.InvalidChunkException { // Trying to get a snapshot before all chunks have been received will throw an exception - SnapshotTracker tracker1 = new SnapshotTracker(logger, 5); + SnapshotTracker tracker1 = new SnapshotTracker(logger, 5, "leader"); tracker1.addChunk(1, chunk1, Optional.absent()); try { @@ -127,7 +127,7 @@ public class SnapshotTrackerTest { } - SnapshotTracker tracker2 = new SnapshotTracker(logger, 3); + SnapshotTracker tracker2 = new SnapshotTracker(logger, 3, "leader"); tracker2.addChunk(1, chunk1, Optional.of(AbstractLeader.INITIAL_LAST_CHUNK_HASH_CODE)); tracker2.addChunk(2, chunk2, Optional.of(Arrays.hashCode(chunk1))); @@ -140,7 +140,7 @@ public class SnapshotTrackerTest { @Test public void testGetCollectedChunks() throws SnapshotTracker.InvalidChunkException { - SnapshotTracker tracker1 = new SnapshotTracker(logger, 5); + SnapshotTracker tracker1 = new SnapshotTracker(logger, 5, "leader"); ByteString chunks = ByteString.copyFrom(chunk1).concat(ByteString.copyFrom(chunk2)); @@ -156,7 +156,7 @@ public class SnapshotTrackerTest { if (size > snapshotLength) { size = snapshotLength; } else { - if ((start + size) > snapshotLength) { + if (start + size > snapshotLength) { size = snapshotLength - start; } } @@ -192,4 +192,4 @@ public class SnapshotTrackerTest { } -} \ No newline at end of file +}