Bug 8206: Fix IOException from initiateCaptureSnapshot 97/54897/4
authorTom Pantelis <tompantelis@gmail.com>
Wed, 12 Apr 2017 19:49:23 +0000 (15:49 -0400)
committerRobert Varga <robert.varga@pantheon.tech>
Fri, 14 Apr 2017 19:08:03 +0000 (21:08 +0200)
Modified the install snapshot chunking to be idempotent to avoid attempts
to send the same chunk twice. This fixes the error:

java.io.IOException: The # of bytes read from the imput stream, -1, does not match the expected # 3075

Change-Id: I5336c88125f226d0976f0d7fe17d03c0d181e12d
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java

index 4382cfec3237075087837b5f1402cadab249188f..d855507fbba7d6e3dd6386ddf74993f3458e1f21 100644 (file)
@@ -531,10 +531,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 installSnapshotState.markSendStatus(false);
             }
 
-            if (wasLastChunk && !context.getSnapshotManager().isCapturing()) {
-                // Since the follower is now caught up try to purge the log.
-                purgeInMemoryLog();
-            } else if (!wasLastChunk && installSnapshotState.canSendNextChunk()) {
+            if (wasLastChunk) {
+                if (!context.getSnapshotManager().isCapturing()) {
+                    // Since the follower is now caught up try to purge the log.
+                    purgeInMemoryLog();
+                }
+            } else {
                 ActorSelection followerActor = context.getPeerActorSelection(followerId);
                 if (followerActor != null) {
                     sendSnapshotChunk(followerActor, followerLogInformation);
@@ -802,6 +804,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 // Ensure the snapshot bytes are set - this is a no-op.
                 installSnapshotState.setSnapshotBytes(snapshotHolder.get().getSnapshotBytes());
 
+                if (!installSnapshotState.canSendNextChunk()) {
+                    return;
+                }
+
                 byte[] nextSnapshotChunk = installSnapshotState.getNextChunk();
 
                 log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(),
index 5d47dbd02ec9754e94e5bf0f3c81e1a769caf6ba..3b4c7d813309a3ddb0b03520ebf1254623578339 100644 (file)
@@ -94,7 +94,8 @@ public final class LeaderInstallSnapshotState implements AutoCloseable {
 
     boolean canSendNextChunk() {
         // we only send a false if a chunk is sent but we have not received a reply yet
-        return snapshotBytes != null && replyReceivedForOffset == offset;
+        return snapshotBytes != null && (nextChunkHashCode == INITIAL_LAST_CHUNK_HASH_CODE
+                || replyReceivedForOffset == offset);
     }
 
     boolean isLastChunk(int index) {
@@ -127,7 +128,7 @@ public final class LeaderInstallSnapshotState implements AutoCloseable {
         int numRead = snapshotInputStream.read(nextChunk);
         if (numRead != size) {
             throw new IOException(String.format(
-                    "The # of bytes read from the imput stream, %d, does not match the expected # %d", numRead, size));
+                    "The # of bytes read from the input stream, %d, does not match the expected # %d", numRead, size));
         }
 
         nextChunkHashCode = Arrays.hashCode(nextChunk);
index d92f0729f207c077f7b38a1427d62a535e49cd0f..05b1d34a0f8b921cffbd192c0b9bb9cfefc6c494 100644 (file)
@@ -20,6 +20,8 @@ import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
@@ -35,6 +37,7 @@ public class MockRaftActorContext extends RaftActorContextImpl {
 
     private ActorSystem system;
     private RaftPolicy raftPolicy;
+    private Consumer<Optional<OutputStream>> createSnapshotProcedure = out -> { };
 
     private static ElectionTerm newElectionTerm() {
         return new ElectionTerm() {
@@ -125,7 +128,7 @@ public class MockRaftActorContext extends RaftActorContextImpl {
     @Override
     public SnapshotManager getSnapshotManager() {
         SnapshotManager snapshotManager = super.getSnapshotManager();
-        snapshotManager.setCreateSnapshotConsumer(out -> { });
+        snapshotManager.setCreateSnapshotConsumer(createSnapshotProcedure);
 
         snapshotManager.setSnapshotCohort(new RaftActorSnapshotCohort() {
             @Override
@@ -145,6 +148,10 @@ public class MockRaftActorContext extends RaftActorContextImpl {
         return snapshotManager;
     }
 
+    public void setCreateSnapshotProcedure(Consumer<Optional<OutputStream>> createSnapshotProcedure) {
+        this.createSnapshotProcedure = createSnapshotProcedure;
+    }
+
     @Override
     public RaftPolicy getRaftPolicy() {
         return raftPolicy != null ? raftPolicy : super.getRaftPolicy();
index 76322eec1a4c6de7756d4ca9f3447b997693c3e0..c3d33e12ec41ae744079bf864a4ab7d769f632f8 100644 (file)
@@ -31,12 +31,14 @@ import com.google.common.io.ByteSource;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import org.junit.After;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
@@ -738,6 +740,9 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
 
         actorContext.getReplicatedLog().removeFrom(0);
 
+        AtomicReference<java.util.Optional<OutputStream>> installSnapshotStream = new AtomicReference<>();
+        actorContext.setCreateSnapshotProcedure(out -> installSnapshotStream.set(out));
+
         leader = new Leader(actorContext);
         actorContext.setCurrentBehavior(leader);
 
@@ -768,16 +773,35 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         assertEquals("isCapturing", true, actorContext.getSnapshotManager().isCapturing());
 
         CaptureSnapshot cs = actorContext.getSnapshotManager().getCaptureSnapshot();
-
         assertEquals(3, cs.getLastAppliedIndex());
         assertEquals(1, cs.getLastAppliedTerm());
         assertEquals(4, cs.getLastIndex());
         assertEquals(2, cs.getLastTerm());
 
-        // if an initiate is started again when first is in progress, it should not initiate Capture
+        assertNotNull("Create snapshot procedure not invoked", installSnapshotStream.get());
+        assertTrue("Install snapshot stream present", installSnapshotStream.get().isPresent());
+
+        MessageCollectorActor.clearMessages(followerActor);
+
+        // Sending Replicate message should not initiate another capture since the first is in progress.
         leader.handleMessage(leaderActor, new Replicate(null, new MockIdentifier("state-id"), entry, true));
+        assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
 
+        // Similarly sending another AppendEntriesReply to force a snapshot should not initiate another capture.
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
         assertSame("CaptureSnapshot instance", cs, actorContext.getSnapshotManager().getCaptureSnapshot());
+
+        // Now simulate the CaptureSnapshotReply to initiate snapshot install - the first chunk should be sent.
+        final byte[] bytes = new byte[]{1, 2, 3};
+        installSnapshotStream.get().get().write(bytes);
+        actorContext.getSnapshotManager().persist(ByteState.of(bytes), installSnapshotStream.get(),
+                Runtime.getRuntime().totalMemory());
+        MessageCollectorActor.expectFirstMatching(followerActor, InstallSnapshot.class);
+
+        // Sending another AppendEntriesReply to force a snapshot should be a no-op and not try to re-send the chunk.
+        MessageCollectorActor.clearMessages(followerActor);
+        leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, false, 1, 1, (short) 1, true));
+        MessageCollectorActor.assertNoneMatching(followerActor, InstallSnapshot.class, 200);
     }