installSnapshotState.markSendStatus(false);
}
- if (wasLastChunk && !context.getSnapshotManager().isCapturing()) {
- // Since the follower is now caught up try to purge the log.
- purgeInMemoryLog();
- } else if (!wasLastChunk && installSnapshotState.canSendNextChunk()) {
+ if (wasLastChunk) {
+ if (!context.getSnapshotManager().isCapturing()) {
+ // Since the follower is now caught up try to purge the log.
+ purgeInMemoryLog();
+ }
+ } else {
ActorSelection followerActor = context.getPeerActorSelection(followerId);
if (followerActor != null) {
sendSnapshotChunk(followerActor, followerLogInformation);
// Ensure the snapshot bytes are set - this is a no-op.
installSnapshotState.setSnapshotBytes(snapshotHolder.get().getSnapshotBytes());
+ if (!installSnapshotState.canSendNextChunk()) {
+ return;
+ }
+
byte[] nextSnapshotChunk = installSnapshotState.getNextChunk();
log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(),
boolean canSendNextChunk() {
// we only send a false if a chunk is sent but we have not received a reply yet
- return snapshotBytes != null && replyReceivedForOffset == offset;
+ return snapshotBytes != null && (nextChunkHashCode == INITIAL_LAST_CHUNK_HASH_CODE
+ || replyReceivedForOffset == offset);
}
boolean isLastChunk(int index) {
int numRead = snapshotInputStream.read(nextChunk);
if (numRead != size) {
throw new IOException(String.format(
- "The # of bytes read from the imput stream, %d, does not match the expected # %d", numRead, size));
+ "The # of bytes read from the input stream, %d, does not match the expected # %d", numRead, size));
}
nextChunkHashCode = Arrays.hashCode(nextChunk);
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
import org.opendaylight.controller.cluster.NonPersistentDataProvider;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.persisted.ByteState;
private ActorSystem system;
private RaftPolicy raftPolicy;
+ private Consumer<Optional<OutputStream>> createSnapshotProcedure = out -> { };
private static ElectionTerm newElectionTerm() {
return new ElectionTerm() {
@Override
public SnapshotManager getSnapshotManager() {
SnapshotManager snapshotManager = super.getSnapshotManager();
- snapshotManager.setCreateSnapshotConsumer(out -> { });
+ snapshotManager.setCreateSnapshotConsumer(createSnapshotProcedure);
snapshotManager.setSnapshotCohort(new RaftActorSnapshotCohort() {
@Override
return snapshotManager;
}
+ public void setCreateSnapshotProcedure(Consumer<Optional<OutputStream>> createSnapshotProcedure) {
+ this.createSnapshotProcedure = createSnapshotProcedure;
+ }
+
@Override
public RaftPolicy getRaftPolicy() {
return raftPolicy != null ? raftPolicy : super.getRaftPolicy();
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.ByteString;
import java.io.IOException;
+import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
actorContext.getReplicatedLog().removeFrom(0);
+ AtomicReference<java.util.Optional<OutputStream>> installSnapshotStream = new AtomicReference<>();
+ actorContext.setCreateSnapshotProcedure(out -> installSnapshotStream.set(out));
+
leader = new Leader(actorContext);
actorContext.setCurrentBehavior(leader);
assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
-
assertEquals(3, cs.getLastAppliedIndex());
assertEquals(1, cs.getLastAppliedTerm());
assertEquals(4, cs.getLastIndex());
assertEquals(2, cs.getLastTerm());
- // if an initiate is started again when first is in progress, it should not initiate Capture
+ assertNotNull("Create snapshot procedure not invoked", installSnapshotStream.get());
+ assertTrue("Install snapshot stream present", installSnapshotStream.get().isPresent());
+
+ MessageCollectorActor.clearMessages(followerActor);
+
+ // Sending Replicate message should not initiate another capture since the first is in progress.
leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
+ assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
+ // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture.
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
+
+ // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent.
+ final byte[] bytes = new byte[]{1, 2, 3};
+ installSnapshotStream.get().get().write(bytes);
+ actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(),
+ Runtime.getRuntime().totalMemory());
+ MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
+
+ // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk.
+ MessageCollectorActor.clearMessages(followerActor);
+ leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
+ MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200);
}