From: Tomas Cere Date: Tue, 30 Apr 2019 08:49:02 +0000 (+0200) Subject: Reset snapshot progress after timeout has been hit X-Git-Tag: release/sodium~99 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=f33beecf2a10955a9219757529ba3017079816cc Reset snapshot progress after timeout has been hit Add a stopwatch to LeaderInstallSnapshotState that allows us to keep track whether a follower took too much time to respond to InstallSnapshot for whatever reason allowing us to retry chunks that time out. JIRA: CONTROLLER-1891 Change-Id: Id443a4cc7a069ad4d9982d537d9a8d82b845ac35 Signed-off-by: Tomas Cere --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index fef6cc8b7c..ec46593503 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -533,6 +533,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return; } + installSnapshotState.resetChunkTimer(); followerLogInformation.markFollowerActive(); if (installSnapshotState.getChunkIndex() == reply.getChunkIndex()) { @@ -664,9 +665,18 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { LeaderInstallSnapshotState installSnapshotState = followerLogInformation.getInstallSnapshotState(); if (installSnapshotState != null) { + // if install snapshot is in process , then sent next chunk if possible - if (isFollowerActive && installSnapshotState.canSendNextChunk()) { - sendSnapshotChunk(followerActor, followerLogInformation); + if (isFollowerActive) { + // 30 seconds with default settings, can be modified via heartbeat or election timeout factor + FiniteDuration snapshotReplyTimeout = context.getConfigParams().getHeartBeatInterval() + .$times(context.getConfigParams().getElectionTimeoutFactor() * 3); + + if (installSnapshotState.isChunkTimedOut(snapshotReplyTimeout)) { + sendAppendEntries = !resendSnapshotChunk(followerActor, followerLogInformation); + } else if (installSnapshotState.canSendNextChunk()) { + sendSnapshotChunk(followerActor, followerLogInformation); + } } else if (sendHeartbeat) { // we send a heartbeat even if we have not received a reply for the last chunk sendAppendEntries = true; @@ -931,18 +941,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { serverConfig = Optional.fromNullable(context.getPeerServerInfo(true)); } - followerActor.tell( - new InstallSnapshot(currentTerm(), context.getId(), - snapshotHolder.get().getLastIncludedIndex(), - snapshotHolder.get().getLastIncludedTerm(), - nextSnapshotChunk, - nextChunkIndex, - installSnapshotState.getTotalChunks(), - Optional.of(installSnapshotState.getLastChunkHashCode()), - serverConfig - ).toSerializable(followerLogInfo.getRaftVersion()), - actor() - ); + sendSnapshotChunk(followerActor, followerLogInfo, nextSnapshotChunk, nextChunkIndex, serverConfig); } catch (IOException e) { throw new RuntimeException(e); @@ -953,6 +952,46 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } } + private void sendSnapshotChunk(final ActorSelection followerActor, final FollowerLogInformation followerLogInfo, + final byte[] snapshotChunk, final int chunkIndex, + final Optional serverConfig) { + LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState(); + + installSnapshotState.startChunkTimer(); + followerActor.tell( + new InstallSnapshot(currentTerm(), context.getId(), + snapshotHolder.get().getLastIncludedIndex(), + snapshotHolder.get().getLastIncludedTerm(), + snapshotChunk, + chunkIndex, + installSnapshotState.getTotalChunks(), + Optional.of(installSnapshotState.getLastChunkHashCode()), + serverConfig + ).toSerializable(followerLogInfo.getRaftVersion()), + actor() + ); + } + + private boolean resendSnapshotChunk(final ActorSelection followerActor, + final FollowerLogInformation followerLogInfo) { + if (!snapshotHolder.isPresent()) { + // Seems like we should never hit this case, but just in case we do, reset the snapshot progress so that it + // can restart from the next AppendEntries. + log.warn("{}: Attempting to resend snapshot with no snapshot holder present.", logName()); + followerLogInfo.clearLeaderInstallSnapshotState(); + return false; + } + + LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState(); + // we are resending, timer needs to be reset + installSnapshotState.resetChunkTimer(); + installSnapshotState.markSendStatus(false); + + sendSnapshotChunk(followerActor, followerLogInfo); + + return true; + } + private void sendHeartBeat() { if (!followerToLog.isEmpty()) { log.trace("{}: Sending heartbeat", logName()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java index 946c56bec0..23a0f6d027 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java @@ -7,12 +7,15 @@ */ package org.opendaylight.controller.cluster.raft.behaviors; +import com.google.common.base.Stopwatch; import com.google.common.io.ByteSource; import java.io.IOException; import java.io.InputStream; import java.util.Arrays; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; /** * Encapsulates the leader state and logic for sending snapshot chunks to a follower. @@ -43,6 +46,8 @@ public final class LeaderInstallSnapshotState implements AutoCloseable { private int nextChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE; private long snapshotSize; private InputStream snapshotInputStream; + private Stopwatch chunkTimer = Stopwatch.createUnstarted(); + private byte[] currentChunk = null; LeaderInstallSnapshotState(final int snapshotChunkSize, final String logName) { this.snapshotChunkSize = snapshotChunkSize; @@ -83,6 +88,18 @@ public final class LeaderInstallSnapshotState implements AutoCloseable { return chunkIndex; } + void startChunkTimer() { + chunkTimer.start(); + } + + void resetChunkTimer() { + chunkTimer.reset(); + } + + boolean isChunkTimedOut(final FiniteDuration timeout) { + return chunkTimer.elapsed(TimeUnit.SECONDS) > timeout.toSeconds(); + } + int getChunkIndex() { return chunkIndex; } @@ -115,26 +132,30 @@ public final class LeaderInstallSnapshotState implements AutoCloseable { } byte[] getNextChunk() throws IOException { - int start = incrementOffset(); - int size = snapshotChunkSize; - if (snapshotChunkSize > snapshotSize) { - size = (int) snapshotSize; - } else if (start + snapshotChunkSize > snapshotSize) { - size = (int) (snapshotSize - start); - } + if (replyStatus || currentChunk == null) { + int start = incrementOffset(); + int size = snapshotChunkSize; + if (snapshotChunkSize > snapshotSize) { + size = (int) snapshotSize; + } else if (start + snapshotChunkSize > snapshotSize) { + size = (int) (snapshotSize - start); + } - byte[] nextChunk = new byte[size]; - int numRead = snapshotInputStream.read(nextChunk); - if (numRead != size) { - throw new IOException(String.format( - "The # of bytes read from the input stream, %d, does not match the expected # %d", numRead, size)); - } + currentChunk = new byte[size]; + int numRead = snapshotInputStream.read(currentChunk); + if (numRead != size) { + throw new IOException(String.format( + "The # of bytes read from the input stream, %d," + + "does not match the expected # %d", numRead, size)); + } + + nextChunkHashCode = Arrays.hashCode(currentChunk); - nextChunkHashCode = Arrays.hashCode(nextChunk); + LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName, + snapshotSize, start, size, nextChunkHashCode); + } - LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName, - snapshotSize, start, size, nextChunkHashCode); - return nextChunk; + return currentChunk; } /** @@ -142,11 +163,13 @@ public final class LeaderInstallSnapshotState implements AutoCloseable { */ void reset() { closeStream(); + chunkTimer.reset(); offset = 0; replyStatus = false; replyReceivedForOffset = offset; chunkIndex = FIRST_CHUNK_INDEX; + currentChunk = null; lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE; try { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java index fa974b0ca4..d6a53a0aee 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java @@ -95,6 +95,16 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A testLog.info("Leader created and elected"); } + private void setupFollower2() { + follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId), + follower1Id, testActorPath(follower1Id)), newFollowerConfigParams()); + + follower2Context = follower2Actor.underlyingActor().getRaftActorContext(); + follower2 = follower2Actor.underlyingActor().getCurrentBehavior(); + + follower2CollectorActor = follower2Actor.underlyingActor().collectorActor(); + } + /** * Send 2 payload instances with follower 2 lagging then resume the follower and verifies it gets * caught up via AppendEntries. @@ -383,6 +393,80 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot complete"); } + /** + * Tests whether the leader reattempts to send a snapshot when a follower crashes before replying with + * InstallSnapshotReply after the last chunk has been sent. + */ + @Test + public void testLeaderInstallsSnapshotWithRestartedFollowerDuringSnapshotInstallation() throws Exception { + testLog.info("testLeaderInstallsSnapshotWithRestartedFollowerDuringSnapshotInstallation starting"); + + setup(); + + sendInitialPayloadsReplicatedToAllFollowers("zero", "one"); + + // Configure follower 2 to drop messages and lag. + follower2Actor.stop(); + + // Sleep for at least the election timeout interval so follower 2 is deemed inactive by the leader. + Uninterruptibles.sleepUninterruptibly(leaderConfigParams.getElectionTimeOutInterval().toMillis() + 5, + TimeUnit.MILLISECONDS); + + // Send 5 payloads - the second should cause a leader snapshot. + final MockPayload payload2 = sendPayloadData(leaderActor, "two"); + final MockPayload payload3 = sendPayloadData(leaderActor, "three"); + final MockPayload payload4 = sendPayloadData(leaderActor, "four"); + final MockPayload payload5 = sendPayloadData(leaderActor, "five"); + final MockPayload payload6 = sendPayloadData(leaderActor, "six"); + + MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class); + + // Verify the leader got consensus and applies each log entry even though follower 2 didn't respond. + List applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 5); + verifyApplyState(applyStates.get(0), leaderCollectorActor, payload2.toString(), currentTerm, 2, payload2); + verifyApplyState(applyStates.get(2), leaderCollectorActor, payload4.toString(), currentTerm, 4, payload4); + verifyApplyState(applyStates.get(4), leaderCollectorActor, payload6.toString(), currentTerm, 6, payload6); + + MessageCollectorActor.clearMessages(leaderCollectorActor); + + testLog.info("testLeaderInstallsSnapshotWithRestartedFollowerDuringSnapshotInstallation: " + + "sending 1 more payload to trigger second snapshot"); + + // Send another payload to trigger a second leader snapshot. + MockPayload payload7 = sendPayloadData(leaderActor, "seven"); + + MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class); + + + ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class); + verifyApplyState(applyState, leaderCollectorActor, payload7.toString(), currentTerm, 7, payload7); + + // Verify follower 1 applies each log entry. + applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 6); + verifyApplyState(applyStates.get(0), null, null, currentTerm, 2, payload2); + verifyApplyState(applyStates.get(2), null, null, currentTerm, 4, payload4); + verifyApplyState(applyStates.get(5), null, null, currentTerm, 7, payload7); + + leaderActor.underlyingActor() + .startDropMessages(InstallSnapshotReply.class, reply -> reply.getChunkIndex() == 5); + + setupFollower2(); + + MessageCollectorActor.expectMatching(follower2CollectorActor, InstallSnapshot.class, 5); + + follower2Actor.stop(); + + // need to get rid of persistence for follower2 + InMemorySnapshotStore.clearSnapshotsFor(follower2Id); + + leaderActor.underlyingActor().stopDropMessages(InstallSnapshotReply.class); + + MessageCollectorActor.clearMessages(follower2CollectorActor); + setupFollower2(); + + MessageCollectorActor.expectMatching(follower2CollectorActor, SaveSnapshotSuccess.class, 1); + } + /** * Send payloads with follower 2 lagging with the last payload having a large enough size to trigger a * leader snapshot such that the leader trims its log from the last applied index.. Follower 2's log will diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemorySnapshotStore.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemorySnapshotStore.java index 4c3ad09d51..e9bd04bf86 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemorySnapshotStore.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemorySnapshotStore.java @@ -70,6 +70,10 @@ public class InMemorySnapshotStore extends SnapshotStore { return retList; } + public static void clearSnapshotsFor(final String persistenceId) { + snapshots.remove(persistenceId); + } + public static void clear() { snapshots.clear(); }