return;
}
+ installSnapshotState.resetChunkTimer();
followerLogInformation.markFollowerActive();
if (installSnapshotState.getChunkIndex() == reply.getChunkIndex()) {
LeaderInstallSnapshotState installSnapshotState = followerLogInformation.getInstallSnapshotState();
if (installSnapshotState != null) {
+
// if install snapshot is in process , then sent next chunk if possible
- if (isFollowerActive && installSnapshotState.canSendNextChunk()) {
- sendSnapshotChunk(followerActor, followerLogInformation);
+ if (isFollowerActive) {
+ // 30 seconds with default settings, can be modified via heartbeat or election timeout factor
+ FiniteDuration snapshotReplyTimeout = context.getConfigParams().getHeartBeatInterval()
+ .$times(context.getConfigParams().getElectionTimeoutFactor() * 3);
+
+ if (installSnapshotState.isChunkTimedOut(snapshotReplyTimeout)) {
+ sendAppendEntries = !resendSnapshotChunk(followerActor, followerLogInformation);
+ } else if (installSnapshotState.canSendNextChunk()) {
+ sendSnapshotChunk(followerActor, followerLogInformation);
+ }
} else if (sendHeartbeat) {
// we send a heartbeat even if we have not received a reply for the last chunk
sendAppendEntries = true;
serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
}
- followerActor.tell(
- new InstallSnapshot(currentTerm(), context.getId(),
- snapshotHolder.get().getLastIncludedIndex(),
- snapshotHolder.get().getLastIncludedTerm(),
- nextSnapshotChunk,
- nextChunkIndex,
- installSnapshotState.getTotalChunks(),
- Optional.of(installSnapshotState.getLastChunkHashCode()),
- serverConfig
- ).toSerializable(followerLogInfo.getRaftVersion()),
- actor()
- );
+ sendSnapshotChunk(followerActor, followerLogInfo, nextSnapshotChunk, nextChunkIndex, serverConfig);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
+ private void sendSnapshotChunk(final ActorSelection followerActor, final FollowerLogInformation followerLogInfo,
+ final byte[] snapshotChunk, final int chunkIndex,
+ final Optional<ServerConfigurationPayload> serverConfig) {
+ LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState();
+
+ installSnapshotState.startChunkTimer();
+ followerActor.tell(
+ new InstallSnapshot(currentTerm(), context.getId(),
+ snapshotHolder.get().getLastIncludedIndex(),
+ snapshotHolder.get().getLastIncludedTerm(),
+ snapshotChunk,
+ chunkIndex,
+ installSnapshotState.getTotalChunks(),
+ Optional.of(installSnapshotState.getLastChunkHashCode()),
+ serverConfig
+ ).toSerializable(followerLogInfo.getRaftVersion()),
+ actor()
+ );
+ }
+
+ private boolean resendSnapshotChunk(final ActorSelection followerActor,
+ final FollowerLogInformation followerLogInfo) {
+ if (!snapshotHolder.isPresent()) {
+ // Seems like we should never hit this case, but just in case we do, reset the snapshot progress so that it
+ // can restart from the next AppendEntries.
+ log.warn("{}: Attempting to resend snapshot with no snapshot holder present.", logName());
+ followerLogInfo.clearLeaderInstallSnapshotState();
+ return false;
+ }
+
+ LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState();
+ // we are resending, timer needs to be reset
+ installSnapshotState.resetChunkTimer();
+ installSnapshotState.markSendStatus(false);
+
+ sendSnapshotChunk(followerActor, followerLogInfo);
+
+ return true;
+ }
+
private void sendHeartBeat() {
if (!followerToLog.isEmpty()) {
log.trace("{}: Sending heartbeat", logName());
*/
package org.opendaylight.controller.cluster.raft.behaviors;
+import com.google.common.base.Stopwatch;
import com.google.common.io.ByteSource;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
/**
* Encapsulates the leader state and logic for sending snapshot chunks to a follower.
private int nextChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
private long snapshotSize;
private InputStream snapshotInputStream;
+ private Stopwatch chunkTimer = Stopwatch.createUnstarted();
+ private byte[] currentChunk = null;
LeaderInstallSnapshotState(final int snapshotChunkSize, final String logName) {
this.snapshotChunkSize = snapshotChunkSize;
return chunkIndex;
}
+ void startChunkTimer() {
+ chunkTimer.start();
+ }
+
+ void resetChunkTimer() {
+ chunkTimer.reset();
+ }
+
+ boolean isChunkTimedOut(final FiniteDuration timeout) {
+ return chunkTimer.elapsed(TimeUnit.SECONDS) > timeout.toSeconds();
+ }
+
int getChunkIndex() {
return chunkIndex;
}
}
byte[] getNextChunk() throws IOException {
- int start = incrementOffset();
- int size = snapshotChunkSize;
- if (snapshotChunkSize > snapshotSize) {
- size = (int) snapshotSize;
- } else if (start + snapshotChunkSize > snapshotSize) {
- size = (int) (snapshotSize - start);
- }
+ if (replyStatus || currentChunk == null) {
+ int start = incrementOffset();
+ int size = snapshotChunkSize;
+ if (snapshotChunkSize > snapshotSize) {
+ size = (int) snapshotSize;
+ } else if (start + snapshotChunkSize > snapshotSize) {
+ size = (int) (snapshotSize - start);
+ }
- byte[] nextChunk = new byte[size];
- int numRead = snapshotInputStream.read(nextChunk);
- if (numRead != size) {
- throw new IOException(String.format(
- "The # of bytes read from the input stream, %d, does not match the expected # %d", numRead, size));
- }
+ currentChunk = new byte[size];
+ int numRead = snapshotInputStream.read(currentChunk);
+ if (numRead != size) {
+ throw new IOException(String.format(
+ "The # of bytes read from the input stream, %d,"
+ + "does not match the expected # %d", numRead, size));
+ }
+
+ nextChunkHashCode = Arrays.hashCode(currentChunk);
- nextChunkHashCode = Arrays.hashCode(nextChunk);
+ LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName,
+ snapshotSize, start, size, nextChunkHashCode);
+ }
- LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName,
- snapshotSize, start, size, nextChunkHashCode);
- return nextChunk;
+ return currentChunk;
}
/**
*/
void reset() {
closeStream();
+ chunkTimer.reset();
offset = 0;
replyStatus = false;
replyReceivedForOffset = offset;
chunkIndex = FIRST_CHUNK_INDEX;
+ currentChunk = null;
lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
try {
testLog.info("Leader created and elected");
}
+ private void setupFollower2() {
+ follower2Actor = newTestRaftActor(follower2Id, ImmutableMap.of(leaderId, testActorPath(leaderId),
+ follower1Id, testActorPath(follower1Id)), newFollowerConfigParams());
+
+ follower2Context = follower2Actor.underlyingActor().getRaftActorContext();
+ follower2 = follower2Actor.underlyingActor().getCurrentBehavior();
+
+ follower2CollectorActor = follower2Actor.underlyingActor().collectorActor();
+ }
+
/**
* Send 2 payload instances with follower 2 lagging then resume the follower and verifies it gets
* caught up via AppendEntries.
testLog.info("testLeaderSnapshotWithLaggingFollowerCaughtUpViaInstallSnapshot complete");
}
+ /**
+ * Tests whether the leader reattempts to send a snapshot when a follower crashes before replying with
+ * InstallSnapshotReply after the last chunk has been sent.
+ */
+ @Test
+ public void testLeaderInstallsSnapshotWithRestartedFollowerDuringSnapshotInstallation() throws Exception {
+ testLog.info("testLeaderInstallsSnapshotWithRestartedFollowerDuringSnapshotInstallation starting");
+
+ setup();
+
+ sendInitialPayloadsReplicatedToAllFollowers("zero", "one");
+
+ // Configure follower 2 to drop messages and lag.
+ follower2Actor.stop();
+
+ // Sleep for at least the election timeout interval so follower 2 is deemed inactive by the leader.
+ Uninterruptibles.sleepUninterruptibly(leaderConfigParams.getElectionTimeOutInterval().toMillis() + 5,
+ TimeUnit.MILLISECONDS);
+
+ // Send 5 payloads - the second should cause a leader snapshot.
+ final MockPayload payload2 = sendPayloadData(leaderActor, "two");
+ final MockPayload payload3 = sendPayloadData(leaderActor, "three");
+ final MockPayload payload4 = sendPayloadData(leaderActor, "four");
+ final MockPayload payload5 = sendPayloadData(leaderActor, "five");
+ final MockPayload payload6 = sendPayloadData(leaderActor, "six");
+
+ MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+ // Verify the leader got consensus and applies each log entry even though follower 2 didn't respond.
+ List<ApplyState> applyStates = MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyState.class, 5);
+ verifyApplyState(applyStates.get(0), leaderCollectorActor, payload2.toString(), currentTerm, 2, payload2);
+ verifyApplyState(applyStates.get(2), leaderCollectorActor, payload4.toString(), currentTerm, 4, payload4);
+ verifyApplyState(applyStates.get(4), leaderCollectorActor, payload6.toString(), currentTerm, 6, payload6);
+
+ MessageCollectorActor.clearMessages(leaderCollectorActor);
+
+ testLog.info("testLeaderInstallsSnapshotWithRestartedFollowerDuringSnapshotInstallation: "
+ + "sending 1 more payload to trigger second snapshot");
+
+ // Send another payload to trigger a second leader snapshot.
+ MockPayload payload7 = sendPayloadData(leaderActor, "seven");
+
+ MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
+
+
+ ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
+ verifyApplyState(applyState, leaderCollectorActor, payload7.toString(), currentTerm, 7, payload7);
+
+ // Verify follower 1 applies each log entry.
+ applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 6);
+ verifyApplyState(applyStates.get(0), null, null, currentTerm, 2, payload2);
+ verifyApplyState(applyStates.get(2), null, null, currentTerm, 4, payload4);
+ verifyApplyState(applyStates.get(5), null, null, currentTerm, 7, payload7);
+
+ leaderActor.underlyingActor()
+ .startDropMessages(InstallSnapshotReply.class, reply -> reply.getChunkIndex() == 5);
+
+ setupFollower2();
+
+ MessageCollectorActor.expectMatching(follower2CollectorActor, InstallSnapshot.class, 5);
+
+ follower2Actor.stop();
+
+ // need to get rid of persistence for follower2
+ InMemorySnapshotStore.clearSnapshotsFor(follower2Id);
+
+ leaderActor.underlyingActor().stopDropMessages(InstallSnapshotReply.class);
+
+ MessageCollectorActor.clearMessages(follower2CollectorActor);
+ setupFollower2();
+
+ MessageCollectorActor.expectMatching(follower2CollectorActor, SaveSnapshotSuccess.class, 1);
+ }
+
/**
* Send payloads with follower 2 lagging with the last payload having a large enough size to trigger a
* leader snapshot such that the leader trims its log from the last applied index.. Follower 2's log will
return retList;
}
+ public static void clearSnapshotsFor(final String persistenceId) {
+ snapshots.remove(persistenceId);
+ }
+
public static void clear() {
snapshots.clear();
}