From 5fd4213b5bfaf2db21f1b37139f6b98535a872c0 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Fri, 20 Jan 2017 15:32:37 -0500 Subject: [PATCH] Bug 7521: Convert install snapshot chunking to use streams On the leader side, converted LeaderInstallSnapshotState to use the InputStream from the ByteSource instead of a ByteString to chunk the data. On the follower side, converted the SnapshotTracker, which is used to reassemble the install snapshot chunks, to write the chunks to an OutputStream instead of a ByteString. Currently a ByteArrayOutputStream is used by will be changed to a FileBackedOutputStream in a subsequent patch. Change-Id: I7a16ad5d44a530e260aa332d91145fbc3fb95f5f Signed-off-by: Tom Pantelis --- .../raft/FollowerLogInformationImpl.java | 2 + .../raft/behaviors/AbstractLeader.java | 66 +++---- .../cluster/raft/behaviors/Follower.java | 18 +- .../behaviors/LeaderInstallSnapshotState.java | 72 +++++--- .../raft/behaviors/SnapshotTracker.java | 39 +++-- .../cluster/raft/behaviors/LeaderTest.java | 10 +- .../raft/behaviors/SnapshotTrackerTest.java | 161 +++++------------- 7 files changed, 174 insertions(+), 194 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java index b78c4fac22..c899967306 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java @@ -200,6 +200,8 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { @Override public void clearLeaderInstallSnapshotState() { + Preconditions.checkState(installSnapshotState != null); + installSnapshotState.close(); installSnapshotState = null; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 548b920fe7..12cbcc0df6 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -14,8 +14,8 @@ import akka.actor.Cancellable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; import com.google.common.io.ByteSource; -import com.google.protobuf.ByteString; import java.io.IOException; import java.util.Collection; import java.util.Collections; @@ -789,35 +789,39 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerLogInfo.setLeaderInstallSnapshotState(installSnapshotState); } - // Ensure the snapshot bytes are set - this is a no-op. - installSnapshotState.setSnapshotBytes(snapshotHolder.get().getSnapshotBytes()); - - byte[] nextSnapshotChunk = installSnapshotState.getNextChunk(); + try { + // Ensure the snapshot bytes are set - this is a no-op. + installSnapshotState.setSnapshotBytes(snapshotHolder.get().getSnapshotBytes()); - log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(), - nextSnapshotChunk.length); + byte[] nextSnapshotChunk = installSnapshotState.getNextChunk(); - int nextChunkIndex = installSnapshotState.incrementChunkIndex(); - Optional serverConfig = Optional.absent(); - if (installSnapshotState.isLastChunk(nextChunkIndex)) { - serverConfig = Optional.fromNullable(context.getPeerServerInfo(true)); - } + log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(), + nextSnapshotChunk.length); - followerActor.tell( - new InstallSnapshot(currentTerm(), context.getId(), - snapshotHolder.get().getLastIncludedIndex(), - snapshotHolder.get().getLastIncludedTerm(), - nextSnapshotChunk, - nextChunkIndex, - installSnapshotState.getTotalChunks(), - Optional.of(installSnapshotState.getLastChunkHashCode()), - serverConfig - ).toSerializable(followerLogInfo.getRaftVersion()), - actor() - ); + int nextChunkIndex = installSnapshotState.incrementChunkIndex(); + Optional serverConfig = Optional.absent(); + if (installSnapshotState.isLastChunk(nextChunkIndex)) { + serverConfig = Optional.fromNullable(context.getPeerServerInfo(true)); + } - log.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", logName(), followerActor.path(), - installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks()); + followerActor.tell( + new InstallSnapshot(currentTerm(), context.getId(), + snapshotHolder.get().getLastIncludedIndex(), + snapshotHolder.get().getLastIncludedTerm(), + nextSnapshotChunk, + nextChunkIndex, + installSnapshotState.getTotalChunks(), + Optional.of(installSnapshotState.getLastChunkHashCode()), + serverConfig + ).toSerializable(followerLogInfo.getRaftVersion()), + actor() + ); + + log.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", logName(), followerActor.path(), + installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks()); + } catch (IOException e) { + throw Throwables.propagate(e); + } } } @@ -912,16 +916,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { static class SnapshotHolder { private final long lastIncludedTerm; private final long lastIncludedIndex; - private final ByteString snapshotBytes; + private final ByteSource snapshotBytes; SnapshotHolder(Snapshot snapshot, ByteSource snapshotBytes) { this.lastIncludedTerm = snapshot.getLastAppliedTerm(); this.lastIncludedIndex = snapshot.getLastAppliedIndex(); - try { - this.snapshotBytes = ByteString.copyFrom(snapshotBytes.read()); - } catch (IOException e) { - throw new RuntimeException("Error reading state", e); - } + this.snapshotBytes = snapshotBytes; } long getLastIncludedTerm() { @@ -932,7 +932,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return lastIncludedIndex; } - ByteString getSnapshotBytes() { + ByteSource getSnapshotBytes() { return snapshotBytes; } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java index 727d6a3131..2cc2c261bb 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java @@ -18,7 +18,6 @@ import akka.cluster.MemberStatus; import akka.japi.Procedure; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; -import com.google.common.io.ByteSource; import java.io.IOException; import java.util.ArrayList; import java.util.Optional; @@ -530,9 +529,8 @@ public class Follower extends AbstractRaftActorBehavior { if (snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(), installSnapshot.getLastChunkHashCode())) { - ByteSource snapshotBytes = ByteSource.wrap(snapshotTracker.getSnapshot()); Snapshot snapshot = Snapshot.create( - context.getSnapshotManager().convertSnapshot(snapshotBytes), + context.getSnapshotManager().convertSnapshot(snapshotTracker.getSnapshotBytes()), new ArrayList<>(), installSnapshot.getLastIncludedIndex(), installSnapshot.getLastIncludedTerm(), @@ -558,24 +556,32 @@ public class Follower extends AbstractRaftActorBehavior { actor().tell(new ApplySnapshot(snapshot, applySnapshotCallback), actor()); - snapshotTracker = null; + closeSnapshotTracker(); } else { log.debug("{}: handleInstallSnapshot returning: {}", logName(), reply); sender.tell(reply, actor()); } - } catch (SnapshotTracker.InvalidChunkException | IOException e) { + } catch (IOException e) { log.debug("{}: Exception in InstallSnapshot of follower", logName(), e); sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(), -1, false), actor()); - snapshotTracker = null; + closeSnapshotTracker(); + } + } + + private void closeSnapshotTracker() { + if (snapshotTracker != null) { + snapshotTracker.close(); + snapshotTracker = null; } } @Override public void close() { + closeSnapshotTracker(); stopElection(); } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java index e0d76a54a4..5d47dbd02e 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java @@ -7,7 +7,10 @@ */ package org.opendaylight.controller.cluster.raft.behaviors; -import com.google.protobuf.ByteString; +import com.google.common.base.Throwables; +import com.google.common.io.ByteSource; +import java.io.IOException; +import java.io.InputStream; import java.util.Arrays; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -15,7 +18,7 @@ import org.slf4j.LoggerFactory; /** * Encapsulates the leader state and logic for sending snapshot chunks to a follower. */ -public final class LeaderInstallSnapshotState { +public final class LeaderInstallSnapshotState implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(LeaderInstallSnapshotState.class); // The index of the first chunk that is sent when installing a snapshot @@ -29,7 +32,7 @@ public final class LeaderInstallSnapshotState { private final int snapshotChunkSize; private final String logName; - private ByteString snapshotBytes; + private ByteSource snapshotBytes; private int offset = 0; // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset private int replyReceivedForOffset = -1; @@ -39,26 +42,27 @@ public final class LeaderInstallSnapshotState { private int totalChunks; private int lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE; private int nextChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE; + private long snapshotSize; + private InputStream snapshotInputStream; LeaderInstallSnapshotState(int snapshotChunkSize, String logName) { this.snapshotChunkSize = snapshotChunkSize; this.logName = logName; } - ByteString getSnapshotBytes() { - return snapshotBytes; - } - - void setSnapshotBytes(ByteString snapshotBytes) { + void setSnapshotBytes(ByteSource snapshotBytes) throws IOException { if (this.snapshotBytes != null) { return; } + snapshotSize = snapshotBytes.size(); + snapshotInputStream = snapshotBytes.openStream(); + this.snapshotBytes = snapshotBytes; - int size = snapshotBytes.size(); - totalChunks = size / snapshotChunkSize + (size % snapshotChunkSize > 0 ? 1 : 0); - LOG.debug("{}: Snapshot {} bytes, total chunks to send: {}", logName, size, totalChunks); + totalChunks = (int) (snapshotSize / snapshotChunkSize + (snapshotSize % snapshotChunkSize > 0 ? 1 : 0)); + + LOG.debug("{}: Snapshot {} bytes, total chunks to send: {}", logName, snapshotSize, totalChunks); replyReceivedForOffset = -1; chunkIndex = FIRST_CHUNK_INDEX; @@ -110,22 +114,26 @@ public final class LeaderInstallSnapshotState { } } - byte[] getNextChunk() { - int snapshotLength = getSnapshotBytes().size(); + byte[] getNextChunk() throws IOException { int start = incrementOffset(); int size = snapshotChunkSize; - if (snapshotChunkSize > snapshotLength) { - size = snapshotLength; - } else if (start + snapshotChunkSize > snapshotLength) { - size = snapshotLength - start; + if (snapshotChunkSize > snapshotSize) { + size = (int) snapshotSize; + } else if (start + snapshotChunkSize > snapshotSize) { + size = (int) (snapshotSize - start); } byte[] nextChunk = new byte[size]; - getSnapshotBytes().copyTo(nextChunk, start, 0, size); + int numRead = snapshotInputStream.read(nextChunk); + if (numRead != size) { + throw new IOException(String.format( + "The # of bytes read from the imput stream, %d, does not match the expected # %d", numRead, size)); + } + nextChunkHashCode = Arrays.hashCode(nextChunk); LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName, - snapshotLength, start, size, nextChunkHashCode); + snapshotSize, start, size, nextChunkHashCode); return nextChunk; } @@ -133,11 +141,37 @@ public final class LeaderInstallSnapshotState { * Reset should be called when the Follower needs to be sent the snapshot from the beginning. */ void reset() { + closeStream(); + offset = 0; replyStatus = false; replyReceivedForOffset = offset; chunkIndex = FIRST_CHUNK_INDEX; lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE; + + try { + snapshotInputStream = snapshotBytes.openStream(); + } catch (IOException e) { + throw Throwables.propagate(e); + } + } + + @Override + public void close() { + closeStream(); + snapshotBytes = null; + } + + private void closeStream() { + if (snapshotInputStream != null) { + try { + snapshotInputStream.close(); + } catch (IOException e) { + LOG.warn("{}: Error closing snapshot stream", logName); + } + + snapshotInputStream = null; + } } int getLastChunkHashCode() { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java index 3ba020b814..77c9cb5783 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java @@ -10,18 +10,22 @@ package org.opendaylight.controller.cluster.raft.behaviors; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.protobuf.ByteString; +import com.google.common.io.ByteSource; +import com.google.common.io.CountingOutputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.util.Arrays; import org.slf4j.Logger; /** * Helper class that maintains state for a snapshot that is being installed in chunks on a Follower. */ -public class SnapshotTracker { +class SnapshotTracker implements AutoCloseable { private final Logger log; private final int totalChunks; private final String leaderId; - private ByteString collectedChunks = ByteString.EMPTY; + private final CountingOutputStream countingStream; + private final ByteArrayOutputStream backingStream; private int lastChunkIndex = LeaderInstallSnapshotState.FIRST_CHUNK_INDEX - 1; private boolean sealed = false; private int lastChunkHashCode = LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE; @@ -30,6 +34,9 @@ public class SnapshotTracker { this.log = log; this.totalChunks = totalChunks; this.leaderId = Preconditions.checkNotNull(leaderId); + + backingStream = new ByteArrayOutputStream(); + countingStream = new CountingOutputStream(backingStream); } /** @@ -42,9 +49,9 @@ public class SnapshotTracker { * @throws InvalidChunkException if the chunk index is invalid or out of order */ boolean addChunk(int chunkIndex, byte[] chunk, Optional maybeLastChunkHashCode) - throws InvalidChunkException { + throws InvalidChunkException, IOException { log.debug("addChunk: chunkIndex={}, lastChunkIndex={}, collectedChunks.size={}, lastChunkHashCode={}", - chunkIndex, lastChunkIndex, collectedChunks.size(), this.lastChunkHashCode); + chunkIndex, lastChunkIndex, countingStream.getCount(), this.lastChunkHashCode); if (sealed) { throw new InvalidChunkException("Invalid chunk received with chunkIndex " + chunkIndex @@ -61,30 +68,36 @@ public class SnapshotTracker { + maybeLastChunkHashCode.get()); } + countingStream.write(chunk); + sealed = chunkIndex == totalChunks; lastChunkIndex = chunkIndex; - collectedChunks = collectedChunks.concat(ByteString.copyFrom(chunk)); this.lastChunkHashCode = Arrays.hashCode(chunk); return sealed; } - byte[] getSnapshot() { + ByteSource getSnapshotBytes() { if (!sealed) { throw new IllegalStateException("lastChunk not received yet"); } - return collectedChunks.toByteArray(); - } - - ByteString getCollectedChunks() { - return collectedChunks; + return ByteSource.wrap(backingStream.toByteArray()); } String getLeaderId() { return leaderId; } - public static class InvalidChunkException extends Exception { + @Override + public void close() { + try { + countingStream.close(); + } catch (IOException e) { + log.warn("Error closing snapshot stream"); + } + } + + public static class InvalidChunkException extends IOException { private static final long serialVersionUID = 1L; InvalidChunkException(String message) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index 99b647b000..f8297b0aaa 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteSource; import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.ByteString; +import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -585,7 +586,7 @@ public class LeaderTest extends AbstractLeaderTest { -1, null, null), ByteSource.wrap(bs.toByteArray()))); LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState( actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName()); - fts.setSnapshotBytes(bs); + fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray())); leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts); //send first chunk and no InstallSnapshotReply received yet @@ -924,7 +925,7 @@ public class LeaderTest extends AbstractLeaderTest { -1, null, null), ByteSource.wrap(bs.toByteArray()))); LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState( actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName()); - fts.setSnapshotBytes(bs); + fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray())); leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts); while (!fts.isLastChunk(fts.getChunkIndex())) { fts.getNextChunk(); @@ -1156,7 +1157,7 @@ public class LeaderTest extends AbstractLeaderTest { } @Test - public void testLeaderInstallSnapshotState() { + public void testLeaderInstallSnapshotState() throws IOException { logStart("testLeaderInstallSnapshotState"); Map leadersSnapshot = new HashMap<>(); @@ -1168,7 +1169,7 @@ public class LeaderTest extends AbstractLeaderTest { byte[] barray = bs.toByteArray(); LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test"); - fts.setSnapshotBytes(bs); + fts.setSnapshotBytes(ByteSource.wrap(barray)); assertEquals(bs.size(), barray.length); @@ -1192,6 +1193,7 @@ public class LeaderTest extends AbstractLeaderTest { } assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks()); + fts.close(); } @Override diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java index 7591bb51d8..281f8071d7 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java @@ -11,14 +11,14 @@ package org.opendaylight.controller.cluster.raft.behaviors; import static org.junit.Assert.assertEquals; import com.google.common.base.Optional; +import com.google.common.io.ByteSource; import com.google.protobuf.ByteString; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.ObjectOutputStream; +import java.io.Serializable; import java.util.Arrays; import java.util.HashMap; import java.util.Map; -import org.junit.Assert; +import org.apache.commons.lang3.SerializationUtils; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; @@ -41,117 +41,67 @@ public class SnapshotTrackerTest { data.put("key2", "value2"); data.put("key3", "value3"); - byteString = toByteString(data); + byteString = ByteString.copyFrom(SerializationUtils.serialize((Serializable) data)); chunk1 = getNextChunk(byteString, 0, 10); chunk2 = getNextChunk(byteString, 10, 10); chunk3 = getNextChunk(byteString, 20, byteString.size()); } @Test - public void testAddChunk() throws SnapshotTracker.InvalidChunkException { - SnapshotTracker tracker1 = new SnapshotTracker(logger, 5, "leader"); + public void testAddChunks() throws IOException { + SnapshotTracker tracker = new SnapshotTracker(logger, 3, "leader"); - tracker1.addChunk(1, chunk1, Optional.absent()); - tracker1.addChunk(2, chunk2, Optional.absent()); - tracker1.addChunk(3, chunk3, Optional.absent()); + tracker.addChunk(1, chunk1, Optional.of(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE)); + tracker.addChunk(2, chunk2, Optional.of(Arrays.hashCode(chunk1))); + tracker.addChunk(3, chunk3, Optional.of(Arrays.hashCode(chunk2))); - // Verify that an InvalidChunkException is thrown when we try to add a chunk to a sealed tracker - SnapshotTracker tracker2 = new SnapshotTracker(logger, 2, "leader"); + ByteSource snapshotBytes = tracker.getSnapshotBytes(); + assertEquals("Deserialized", data, SerializationUtils.deserialize(snapshotBytes.read())); - tracker2.addChunk(1, chunk1, Optional.absent()); - tracker2.addChunk(2, chunk2, Optional.absent()); - - try { - tracker2.addChunk(3, chunk3, Optional.absent()); - Assert.fail(); - } catch (SnapshotTracker.InvalidChunkException e) { - // expected - } - - // The first chunk's index must at least be FIRST_CHUNK_INDEX - SnapshotTracker tracker3 = new SnapshotTracker(logger, 2, "leader"); + tracker.close(); + } - try { - tracker3.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX - 1, chunk1, Optional.absent()); - Assert.fail(); - } catch (SnapshotTracker.InvalidChunkException e) { - // expected + @Test(expected = SnapshotTracker.InvalidChunkException.class) + public void testAddChunkWhenAlreadySealed() throws IOException { + try (SnapshotTracker tracker = new SnapshotTracker(logger, 2, "leader")) { + tracker.addChunk(1, chunk1, Optional.absent()); + tracker.addChunk(2, chunk2, Optional.absent()); + tracker.addChunk(3, chunk3, Optional.absent()); } + } - // Out of sequence chunk indexes won't work - SnapshotTracker tracker4 = new SnapshotTracker(logger, 2, "leader"); - - tracker4.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX, chunk1, Optional.absent()); - - try { - tracker4.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX + 2, chunk2, Optional.absent()); - Assert.fail(); - } catch (SnapshotTracker.InvalidChunkException e) { - // expected + @Test(expected = SnapshotTracker.InvalidChunkException.class) + public void testInvalidFirstChunkIndex() throws IOException { + try (SnapshotTracker tracker = new SnapshotTracker(logger, 2, "leader")) { + tracker.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX - 1, chunk1, Optional.absent()); } + } - // No exceptions will be thrown when invalid chunk is added with the right sequence - // If the lastChunkHashCode is missing - SnapshotTracker tracker5 = new SnapshotTracker(logger, 2, "leader"); - - tracker5.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX, chunk1, Optional.absent()); - // Look I can add the same chunk again - tracker5.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX + 1, chunk1, Optional.absent()); - - // An exception will be thrown when an invalid chunk is addedd with the right sequence - // when the lastChunkHashCode is present - SnapshotTracker tracker6 = new SnapshotTracker(logger, 2, "leader"); - - tracker6.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX, chunk1, Optional.of(-1)); - - try { - // Here we add a second chunk and tell addChunk that the previous chunk had a hash code 777 - tracker6.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX + 1, chunk2, Optional.of(777)); - Assert.fail(); - } catch (SnapshotTracker.InvalidChunkException e) { - // expected + @Test(expected = SnapshotTracker.InvalidChunkException.class) + public void testOutOfSequenceChunk() throws IOException { + try (SnapshotTracker tracker = new SnapshotTracker(logger, 2, "leader")) { + tracker.addChunk(1, chunk1, Optional.absent()); + tracker.addChunk(3, chunk3, Optional.absent()); } - } - @Test - public void testGetSnapShot() throws SnapshotTracker.InvalidChunkException { - - // Trying to get a snapshot before all chunks have been received will throw an exception - SnapshotTracker tracker1 = new SnapshotTracker(logger, 5, "leader"); - - tracker1.addChunk(1, chunk1, Optional.absent()); - try { - tracker1.getSnapshot(); - Assert.fail(); - } catch (IllegalStateException e) { - // expected + @Test(expected = SnapshotTracker.InvalidChunkException.class) + public void testInvalidLastChunkHashCode() throws IOException { + try (SnapshotTracker tracker = new SnapshotTracker(logger, 2, "leader")) { + tracker.addChunk(1, chunk1, Optional.of(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE)); + tracker.addChunk(2, chunk2, Optional.of(1)); } - - SnapshotTracker tracker2 = new SnapshotTracker(logger, 3, "leader"); - - tracker2.addChunk(1, chunk1, Optional.of(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE)); - tracker2.addChunk(2, chunk2, Optional.of(Arrays.hashCode(chunk1))); - tracker2.addChunk(3, chunk3, Optional.of(Arrays.hashCode(chunk2))); - - byte[] snapshot = tracker2.getSnapshot(); - - assertEquals(byteString, ByteString.copyFrom(snapshot)); } - @Test - public void testGetCollectedChunks() throws SnapshotTracker.InvalidChunkException { - SnapshotTracker tracker1 = new SnapshotTracker(logger, 5, "leader"); - - ByteString chunks = ByteString.copyFrom(chunk1).concat(ByteString.copyFrom(chunk2)); - - tracker1.addChunk(1, chunk1, Optional.of(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE)); - tracker1.addChunk(2, chunk2, Optional.of(Arrays.hashCode(chunk1))); - - assertEquals(chunks, tracker1.getCollectedChunks()); + @Test(expected = IllegalStateException.class) + public void testGetSnapshotBytesWhenNotSealed() throws IOException { + try (SnapshotTracker tracker = new SnapshotTracker(logger, 2, "leader")) { + tracker.addChunk(1, chunk1, Optional.absent()); + tracker.getSnapshotBytes(); + } } - public byte[] getNextChunk(ByteString bs, int offset, int size) { + private byte[] getNextChunk(ByteString bs, int offset, int size) { int snapshotLength = bs.size(); int start = offset; if (size > snapshotLength) { @@ -166,31 +116,4 @@ public class SnapshotTrackerTest { bs.copyTo(nextChunk, start, 0, size); return nextChunk; } - - private static ByteString toByteString(Map state) { - ByteArrayOutputStream bos = null; - ObjectOutputStream os = null; - try { - try { - bos = new ByteArrayOutputStream(); - os = new ObjectOutputStream(bos); - os.writeObject(state); - byte[] snapshotBytes = bos.toByteArray(); - return ByteString.copyFrom(snapshotBytes); - } finally { - if (os != null) { - os.flush(); - os.close(); - } - if (bos != null) { - bos.close(); - } - } - } catch (IOException e) { - org.junit.Assert.fail("IOException in converting Hashmap to Bytestring:" + e); - } - return null; - } - - } -- 2.36.6