From 8049fd4d06da0f4616180e46fbbe95f98cf698ea Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Wed, 12 Apr 2017 15:49:23 -0400 Subject: [PATCH] Bug 8206: Fix IOException from initiateCaptureSnapshot Modified the install snapshot chunking to be idempotent to avoid attempts to send the same chunk twice. This fixes the error: java.io.IOException: The # of bytes read from the imput stream, -1, does not match the expected # 3075 Change-Id: I5336c88125f226d0976f0d7fe17d03c0d181e12d Signed-off-by: Tom Pantelis --- .../raft/behaviors/AbstractLeader.java | 14 +++++++--- .../behaviors/LeaderInstallSnapshotState.java | 5 ++-- .../cluster/raft/MockRaftActorContext.java | 9 +++++- .../cluster/raft/behaviors/LeaderTest.java | 28 +++++++++++++++++-- 4 files changed, 47 insertions(+), 9 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 4382cfec32..d855507fbb 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -531,10 +531,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { installSnapshotState.markSendStatus(false); } - if (wasLastChunk && !context.getSnapshotManager().isCapturing()) { - // Since the follower is now caught up try to purge the log. - purgeInMemoryLog(); - } else if (!wasLastChunk && installSnapshotState.canSendNextChunk()) { + if (wasLastChunk) { + if (!context.getSnapshotManager().isCapturing()) { + // Since the follower is now caught up try to purge the log. + purgeInMemoryLog(); + } + } else { ActorSelection followerActor = context.getPeerActorSelection(followerId); if (followerActor != null) { sendSnapshotChunk(followerActor, followerLogInformation); @@ -802,6 +804,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Ensure the snapshot bytes are set - this is a no-op. installSnapshotState.setSnapshotBytes(snapshotHolder.get().getSnapshotBytes()); + if (!installSnapshotState.canSendNextChunk()) { + return; + } + byte[] nextSnapshotChunk = installSnapshotState.getNextChunk(); log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(), diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java index 5d47dbd02e..3b4c7d8133 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java @@ -94,7 +94,8 @@ public final class LeaderInstallSnapshotState implements AutoCloseable { boolean canSendNextChunk() { // we only send a false if a chunk is sent but we have not received a reply yet - return snapshotBytes != null && replyReceivedForOffset == offset; + return snapshotBytes != null && (nextChunkHashCode == INITIAL_LAST_CHUNK_HASH_CODE + || replyReceivedForOffset == offset); } boolean isLastChunk(int index) { @@ -127,7 +128,7 @@ public final class LeaderInstallSnapshotState implements AutoCloseable { int numRead = snapshotInputStream.read(nextChunk); if (numRead != size) { throw new IOException(String.format( - "The # of bytes read from the imput stream, %d, does not match the expected # %d", numRead, size)); + "The # of bytes read from the input stream, %d, does not match the expected # %d", numRead, size)); } nextChunkHashCode = Arrays.hashCode(nextChunk); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java index d92f0729f2..05b1d34a0f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java @@ -20,6 +20,8 @@ import java.io.OutputStream; import java.io.Serializable; import java.util.HashMap; import java.util.Map; +import java.util.Optional; +import java.util.function.Consumer; import org.opendaylight.controller.cluster.NonPersistentDataProvider; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.persisted.ByteState; @@ -35,6 +37,7 @@ public class MockRaftActorContext extends RaftActorContextImpl { private ActorSystem system; private RaftPolicy raftPolicy; + private Consumer> createSnapshotProcedure = out -> { }; private static ElectionTerm newElectionTerm() { return new ElectionTerm() { @@ -125,7 +128,7 @@ public class MockRaftActorContext extends RaftActorContextImpl { @Override public SnapshotManager getSnapshotManager() { SnapshotManager snapshotManager = super.getSnapshotManager(); - snapshotManager.setCreateSnapshotConsumer(out -> { }); + snapshotManager.setCreateSnapshotConsumer(createSnapshotProcedure); snapshotManager.setSnapshotCohort(new RaftActorSnapshotCohort() { @Override @@ -145,6 +148,10 @@ public class MockRaftActorContext extends RaftActorContextImpl { return snapshotManager; } + public void setCreateSnapshotProcedure(Consumer> createSnapshotProcedure) { + this.createSnapshotProcedure = createSnapshotProcedure; + } + @Override public RaftPolicy getRaftPolicy() { return raftPolicy != null ? raftPolicy : super.getRaftPolicy(); diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 76322eec1a..c3d33e12ec 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -31,12 +31,14 @@ import com.google.common.io.ByteSource; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; import java.io.IOException; +import java.io.OutputStream; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.After; import org.junit.Test; import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; @@ -738,6 +740,9 @@ public class LeaderTest extends AbstractLeaderTest { actorContext.getReplicatedLog().removeFrom(0); + AtomicReference> installSnapshotStream = new AtomicReference<>(); + actorContext.setCreateSnapshotProcedure(out -> installSnapshotStream.set(out)); + leader = new Leader(actorContext); actorContext.setCurrentBehavior(leader); @@ -768,16 +773,35 @@ public class LeaderTest extends AbstractLeaderTest { assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing()); CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot(); - assertEquals(3, cs.getLastAppliedIndex()); assertEquals(1, cs.getLastAppliedTerm()); assertEquals(4, cs.getLastIndex()); assertEquals(2, cs.getLastTerm()); - // if an initiate is started again when first is in progress, it should not initiate Capture + assertNotNull("Create snapshot procedure not invoked", installSnapshotStream.get()); + assertTrue("Install snapshot stream present", installSnapshotStream.get().isPresent()); + + MessageCollectorActor.clearMessages(followerActor); + + // Sending Replicate message should not initiate another capture since the first is in progress. leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true)); + assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); + // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture. + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true)); assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot()); + + // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent. + final byte[] bytes = new byte[]{1, 2, 3}; + installSnapshotStream.get().get().write(bytes); + actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(), + Runtime.getRuntime().totalMemory()); + MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class); + + // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk. + MessageCollectorActor.clearMessages(followerActor); + leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true)); + MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200); } -- 2.36.6