Reset snapshot progress after timeout has been hit 16/81816/7
authorTomas Cere <tomas.cere@pantheon.tech>
Tue, 30 Apr 2019 08:49:02 +0000 (10:49 +0200)
committerTom Pantelis <tompantelis@gmail.com>
Tue, 7 May 2019 02:43:52 +0000 (02:43 +0000)
Add a stopwatch to LeaderInstallSnapshotState that allows
us to keep track whether a follower took too much time to respond
to InstallSnapshot for whatever reason allowing us to retry
chunks that time out.

JIRA: CONTROLLER-1891
Change-Id: Id443a4cc7a069ad4d9982d537d9a8d82b845ac35
Signed-off-by: Tomas Cere <tomas.cere@pantheon.tech>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemorySnapshotStore.java

index fef6cc8b7c57a142d683d1e0f27214ad751fd9d3..ec465935039eb99a0c34341e05aa93bd2340e34d 100644 (file)
@@ -533,6 +533,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             return;
         }
 
+        installSnapshotState.resetChunkTimer();
         followerLogInformation.markFollowerActive();
 
         if (installSnapshotState.getChunkIndex() == reply.getChunkIndex()) {
@@ -664,9 +665,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
             LeaderInstallSnapshotState installSnapshotState = followerLogInformation.getInstallSnapshotState();
             if (installSnapshotState != null) {
+
                 // if install snapshot is in process , then sent next chunk if possible
-                if (isFollowerActive && installSnapshotState.canSendNextChunk()) {
-                    sendSnapshotChunk(followerActor, followerLogInformation);
+                if (isFollowerActive) {
+                    // 30 seconds with default settings, can be modified via heartbeat or election timeout factor
+                    FiniteDuration snapshotReplyTimeout = context.getConfigParams().getHeartBeatInterval()
+                            .$times(context.getConfigParams().getElectionTimeoutFactor() * 3);
+
+                    if (installSnapshotState.isChunkTimedOut(snapshotReplyTimeout)) {
+                        sendAppendEntries = !resendSnapshotChunk(followerActor, followerLogInformation);
+                    } else if (installSnapshotState.canSendNextChunk()) {
+                        sendSnapshotChunk(followerActor, followerLogInformation);
+                    }
                 } else if (sendHeartbeat) {
                     // we send a heartbeat even if we have not received a reply for the last chunk
                     sendAppendEntries = true;
@@ -931,18 +941,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                     serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
                 }
 
-                followerActor.tell(
-                    new InstallSnapshot(currentTerm(), context.getId(),
-                        snapshotHolder.get().getLastIncludedIndex(),
-                        snapshotHolder.get().getLastIncludedTerm(),
-                        nextSnapshotChunk,
-                        nextChunkIndex,
-                        installSnapshotState.getTotalChunks(),
-                        Optional.of(installSnapshotState.getLastChunkHashCode()),
-                        serverConfig
-                    ).toSerializable(followerLogInfo.getRaftVersion()),
-                    actor()
-                );
+                sendSnapshotChunk(followerActor, followerLogInfo, nextSnapshotChunk, nextChunkIndex, serverConfig);
 
             } catch (IOException e) {
                 throw new RuntimeException(e);
@@ -953,6 +952,46 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         }
     }
 
+    private void sendSnapshotChunk(final ActorSelection followerActor, final FollowerLogInformation followerLogInfo,
+                                   final byte[] snapshotChunk, final int chunkIndex,
+                                   final Optional<ServerConfigurationPayload> serverConfig) {
+        LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState();
+
+        installSnapshotState.startChunkTimer();
+        followerActor.tell(
+                new InstallSnapshot(currentTerm(), context.getId(),
+                        snapshotHolder.get().getLastIncludedIndex(),
+                        snapshotHolder.get().getLastIncludedTerm(),
+                        snapshotChunk,
+                        chunkIndex,
+                        installSnapshotState.getTotalChunks(),
+                        Optional.of(installSnapshotState.getLastChunkHashCode()),
+                        serverConfig
+                ).toSerializable(followerLogInfo.getRaftVersion()),
+                actor()
+        );
+    }
+
+    private boolean resendSnapshotChunk(final ActorSelection followerActor,
+                                        final FollowerLogInformation followerLogInfo) {
+        if (!snapshotHolder.isPresent()) {
+            // Seems like we should never hit this case, but just in case we do, reset the snapshot progress so that it
+            // can restart from the next AppendEntries.
+            log.warn("{}: Attempting to resend snapshot with no snapshot holder present.", logName());
+            followerLogInfo.clearLeaderInstallSnapshotState();
+            return false;
+        }
+
+        LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState();
+        // we are resending, timer needs to be reset
+        installSnapshotState.resetChunkTimer();
+        installSnapshotState.markSendStatus(false);
+
+        sendSnapshotChunk(followerActor, followerLogInfo);
+
+        return true;
+    }
+
     private void sendHeartBeat() {
         if (!followerToLog.isEmpty()) {
             log.trace("{}: Sending heartbeat", logName());
index 946c56bec091e6dba8c490a1c9ede601e8d7e532..23a0f6d027c53841644a6ab1c5273708a9e0a011 100644 (file)
@@ -7,12 +7,15 @@
  */
 package org.opendaylight.controller.cluster.raft.behaviors;
 
+import com.google.common.base.Stopwatch;
 import com.google.common.io.ByteSource;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * Encapsulates the leader state and logic for sending snapshot chunks to a follower.
@@ -43,6 +46,8 @@ public final class LeaderInstallSnapshotState implements AutoCloseable {
     private int nextChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
     private long snapshotSize;
     private InputStream snapshotInputStream;
+    private Stopwatch chunkTimer = Stopwatch.createUnstarted();
+    private byte[] currentChunk = null;
 
     LeaderInstallSnapshotState(final int snapshotChunkSize, final String logName) {
         this.snapshotChunkSize = snapshotChunkSize;
@@ -83,6 +88,18 @@ public final class LeaderInstallSnapshotState implements AutoCloseable {
         return chunkIndex;
     }
 
+    void startChunkTimer() {
+        chunkTimer.start();
+    }
+
+    void resetChunkTimer() {
+        chunkTimer.reset();
+    }
+
+    boolean isChunkTimedOut(final FiniteDuration timeout) {
+        return chunkTimer.elapsed(TimeUnit.SECONDS) > timeout.toSeconds();
+    }
+
     int getChunkIndex() {
         return chunkIndex;
     }
@@ -115,26 +132,30 @@ public final class LeaderInstallSnapshotState implements AutoCloseable {
     }
 
     byte[] getNextChunk() throws IOException {
-        int start = incrementOffset();
-        int size = snapshotChunkSize;
-        if (snapshotChunkSize > snapshotSize) {
-            size = (int) snapshotSize;
-        } else if (start + snapshotChunkSize > snapshotSize) {
-            size = (int) (snapshotSize - start);
-        }
+        if (replyStatus || currentChunk == null) {
+            int start = incrementOffset();
+            int size = snapshotChunkSize;
+            if (snapshotChunkSize > snapshotSize) {
+                size = (int) snapshotSize;
+            } else if (start + snapshotChunkSize > snapshotSize) {
+                size = (int) (snapshotSize - start);
+            }
 
-        byte[] nextChunk = new byte[size];
-        int numRead = snapshotInputStream.read(nextChunk);
-        if (numRead != size) {
-            throw new IOException(String.format(
-                    "The # of bytes read from the input stream, %d, does not match the expected # %d", numRead, size));
-        }
+            currentChunk = new byte[size];
+            int numRead = snapshotInputStream.read(currentChunk);
+            if (numRead != size) {
+                throw new IOException(String.format(
+                        "The # of bytes read from the input stream, %d,"
+                                + "does not match the expected # %d", numRead, size));
+            }
+
+            nextChunkHashCode = Arrays.hashCode(currentChunk);
 
-        nextChunkHashCode = Arrays.hashCode(nextChunk);
+            LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName,
+                    snapshotSize, start, size, nextChunkHashCode);
+        }
 
-        LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName,
-                snapshotSize, start, size, nextChunkHashCode);
-        return nextChunk;
+        return currentChunk;
     }
 
     /**
@@ -142,11 +163,13 @@ public final class LeaderInstallSnapshotState implements AutoCloseable {
      */
     void reset() {
         closeStream();
+        chunkTimer.reset();
 
         offset = 0;
         replyStatus = false;
         replyReceivedForOffset = offset;
         chunkIndex = FIRST_CHUNK_INDEX;
+        currentChunk = null;
         lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
 
         try {
index fa974b0ca42eee0b704df9afd1dd9ead437b18c9..d6a53a0aeeb6e84b86d457b3ca667dbaee111101 100644 (file)
@@ -95,6 +95,16 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         testLog.info("Leader created and elected");
     }
 
+    private void setupFollower2() {
+        follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
+                follower1Id, testActorPath(follower1Id)), newFollowerConfigParams());
+
+        follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
+        follower2 = follower2Actor.underlyingActor().getCurrentBehavior();
+
+        follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
+    }
+
     /**
      * Send 2 payload instances with follower 2 lagging then resume the follower and verifies it gets
      * caught up via AppendEntries.
@@ -383,6 +393,80 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot complete");
     }
 
+    /**
+     * Tests whether the leader reattempts to send a snapshot when a follower crashes before replying with
+     * InstallSnapshotReply after the last chunk has been sent.
+     */
+    @Test
+    public void testLeaderInstallsSnapshotWithRestartedFollowerDuringSnapshotInstallation() throws Exception {
+        testLog.info("testLeaderInstallsSnapshotWithRestartedFollowerDuringSnapshotInstallation starting");
+
+        setup();
+
+        sendInitialPayloadsReplicatedToAllFollowers("zero", "one");
+
+        // Configure follower 2 to drop messages and lag.
+        follower2Actor.stop();
+
+        // Sleep for at least the election timeout interval so follower 2 is deemed inactive by the leader.
+        Uninterruptibles.sleepUninterruptibly(leaderConfigParams.getElectionTimeOutInterval().toMillis() + 5,
+                TimeUnit.MILLISECONDS);
+
+        // Send 5 payloads - the second should cause a leader snapshot.
+        final MockPayload payload2 = sendPayloadData(leaderActor, "two");
+        final MockPayload payload3 = sendPayloadData(leaderActor, "three");
+        final MockPayload payload4 = sendPayloadData(leaderActor, "four");
+        final MockPayload payload5 = sendPayloadData(leaderActor, "five");
+        final MockPayload payload6 = sendPayloadData(leaderActor, "six");
+
+        MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+        // Verify the leader got consensus and applies each log entry even though follower 2 didn't respond.
+        List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 5);
+        verifyApplyState(applyStates.get(0), leaderCollectorActor, payload2.toString(), currentTerm, 2, payload2);
+        verifyApplyState(applyStates.get(2), leaderCollectorActor, payload4.toString(), currentTerm, 4, payload4);
+        verifyApplyState(applyStates.get(4), leaderCollectorActor, payload6.toString(), currentTerm, 6, payload6);
+
+        MessageCollectorActor.clearMessages(leaderCollectorActor);
+
+        testLog.info("testLeaderInstallsSnapshotWithRestartedFollowerDuringSnapshotInstallation: "
+                + "sending 1 more payload to trigger second snapshot");
+
+        // Send another payload to trigger a second leader snapshot.
+        MockPayload payload7 = sendPayloadData(leaderActor, "seven");
+
+        MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+
+        ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
+        verifyApplyState(applyState, leaderCollectorActor, payload7.toString(), currentTerm, 7, payload7);
+
+        // Verify follower 1 applies each log entry.
+        applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 6);
+        verifyApplyState(applyStates.get(0), null, null, currentTerm, 2, payload2);
+        verifyApplyState(applyStates.get(2), null, null, currentTerm, 4, payload4);
+        verifyApplyState(applyStates.get(5), null, null, currentTerm, 7, payload7);
+
+        leaderActor.underlyingActor()
+                .startDropMessages(InstallSnapshotReply.class, reply -> reply.getChunkIndex() == 5);
+
+        setupFollower2();
+
+        MessageCollectorActor.expectMatching(follower2CollectorActor, InstallSnapshot.class, 5);
+
+        follower2Actor.stop();
+
+        // need to get rid of persistence for follower2
+        InMemorySnapshotStore.clearSnapshotsFor(follower2Id);
+
+        leaderActor.underlyingActor().stopDropMessages(InstallSnapshotReply.class);
+
+        MessageCollectorActor.clearMessages(follower2CollectorActor);
+        setupFollower2();
+
+        MessageCollectorActor.expectMatching(follower2CollectorActor, SaveSnapshotSuccess.class, 1);
+    }
+
     /**
      * Send payloads with follower 2 lagging with the last payload having a large enough size to trigger a
      * leader snapshot such that the leader trims its log from the last applied index.. Follower 2's log will
index 4c3ad09d5172a6b7fe0203cc8ea193ee2da2c11f..e9bd04bf86ebb7016193aafe8addfa1d0da4b04c 100644 (file)
@@ -70,6 +70,10 @@ public class InMemorySnapshotStore extends SnapshotStore {
         return retList;
     }
 
+    public static void clearSnapshotsFor(final String persistenceId) {
+        snapshots.remove(persistenceId);
+    }
+
     public static void clear() {
         snapshots.clear();
     }