Bug 7521: Convert install snapshot chunking to use streams 44/50744/7
authorTom Pantelis <tpanteli@brocade.com>
Fri, 20 Jan 2017 20:32:37 +0000 (15:32 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Fri, 10 Feb 2017 12:16:15 +0000 (12:16 +0000)
On the leader side, converted LeaderInstallSnapshotState to use
the InputStream from the ByteSource instead of a ByteString to
chunk the data.

On the follower side, converted the SnapshotTracker, which is used
to reassemble the install snapshot chunks, to write the chunks to
an OutputStream instead of a ByteString. Currently a
ByteArrayOutputStream is used by will be changed to a
FileBackedOutputStream in a subsequent patch.

Change-Id: I7a16ad5d44a530e260aa332d91145fbc3fb95f5f
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderInstallSnapshotState.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTracker.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/SnapshotTrackerTest.java

index b78c4fac223dcda8d98a8571dfa9cf4a33f28ba6..c89996730694844cb2e35bb7ab333898e47e4345 100644 (file)
@@ -200,6 +200,8 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
 
     @Override
     public void clearLeaderInstallSnapshotState() {
+        Preconditions.checkState(installSnapshotState != null);
+        installSnapshotState.close();
         installSnapshotState = null;
     }
 
index 548b920fe771f183b904e2d5e2e65d7001cb746e..12cbcc0df66aa635315436ab04a4cd5e00bfa5cd 100644 (file)
@@ -14,8 +14,8 @@ import akka.actor.Cancellable;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import com.google.common.io.ByteSource;
-import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
@@ -789,35 +789,39 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 followerLogInfo.setLeaderInstallSnapshotState(installSnapshotState);
             }
 
-            // Ensure the snapshot bytes are set - this is a no-op.
-            installSnapshotState.setSnapshotBytes(snapshotHolder.get().getSnapshotBytes());
-
-            byte[] nextSnapshotChunk = installSnapshotState.getNextChunk();
+            try {
+                // Ensure the snapshot bytes are set - this is a no-op.
+                installSnapshotState.setSnapshotBytes(snapshotHolder.get().getSnapshotBytes());
 
-            log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(),
-                    nextSnapshotChunk.length);
+                byte[] nextSnapshotChunk = installSnapshotState.getNextChunk();
 
-            int nextChunkIndex = installSnapshotState.incrementChunkIndex();
-            Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
-            if (installSnapshotState.isLastChunk(nextChunkIndex)) {
-                serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
-            }
+                log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(),
+                        nextSnapshotChunk.length);
 
-            followerActor.tell(
-                new InstallSnapshot(currentTerm(), context.getId(),
-                    snapshotHolder.get().getLastIncludedIndex(),
-                    snapshotHolder.get().getLastIncludedTerm(),
-                    nextSnapshotChunk,
-                    nextChunkIndex,
-                    installSnapshotState.getTotalChunks(),
-                    Optional.of(installSnapshotState.getLastChunkHashCode()),
-                    serverConfig
-                ).toSerializable(followerLogInfo.getRaftVersion()),
-                actor()
-            );
+                int nextChunkIndex = installSnapshotState.incrementChunkIndex();
+                Optional<ServerConfigurationPayload> serverConfig = Optional.absent();
+                if (installSnapshotState.isLastChunk(nextChunkIndex)) {
+                    serverConfig = Optional.fromNullable(context.getPeerServerInfo(true));
+                }
 
-            log.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", logName(), followerActor.path(),
-                    installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks());
+                followerActor.tell(
+                    new InstallSnapshot(currentTerm(), context.getId(),
+                        snapshotHolder.get().getLastIncludedIndex(),
+                        snapshotHolder.get().getLastIncludedTerm(),
+                        nextSnapshotChunk,
+                        nextChunkIndex,
+                        installSnapshotState.getTotalChunks(),
+                        Optional.of(installSnapshotState.getLastChunkHashCode()),
+                        serverConfig
+                    ).toSerializable(followerLogInfo.getRaftVersion()),
+                    actor()
+                );
+
+                log.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", logName(), followerActor.path(),
+                        installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks());
+            } catch (IOException e) {
+                throw Throwables.propagate(e);
+            }
         }
     }
 
@@ -912,16 +916,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     static class SnapshotHolder {
         private final long lastIncludedTerm;
         private final long lastIncludedIndex;
-        private final ByteString snapshotBytes;
+        private final ByteSource snapshotBytes;
 
         SnapshotHolder(Snapshot snapshot, ByteSource snapshotBytes) {
             this.lastIncludedTerm = snapshot.getLastAppliedTerm();
             this.lastIncludedIndex = snapshot.getLastAppliedIndex();
-            try {
-                this.snapshotBytes = ByteString.copyFrom(snapshotBytes.read());
-            } catch (IOException e) {
-                throw new RuntimeException("Error reading state", e);
-            }
+            this.snapshotBytes = snapshotBytes;
         }
 
         long getLastIncludedTerm() {
@@ -932,7 +932,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             return lastIncludedIndex;
         }
 
-        ByteString getSnapshotBytes() {
+        ByteSource getSnapshotBytes() {
             return snapshotBytes;
         }
     }
index 727d6a3131682281ea9825264dc9ac380d43597b..2cc2c261bb1d77f3699d5f41d233020b5e4e8b65 100644 (file)
@@ -18,7 +18,6 @@ import akka.cluster.MemberStatus;
 import akka.japi.Procedure;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
-import com.google.common.io.ByteSource;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Optional;
@@ -530,9 +529,8 @@ public class Follower extends AbstractRaftActorBehavior {
 
             if (snapshotTracker.addChunk(installSnapshot.getChunkIndex(), installSnapshot.getData(),
                     installSnapshot.getLastChunkHashCode())) {
-                ByteSource snapshotBytes = ByteSource.wrap(snapshotTracker.getSnapshot());
                 Snapshot snapshot = Snapshot.create(
-                        context.getSnapshotManager().convertSnapshot(snapshotBytes),
+                        context.getSnapshotManager().convertSnapshot(snapshotTracker.getSnapshotBytes()),
                         new ArrayList<>(),
                         installSnapshot.getLastIncludedIndex(),
                         installSnapshot.getLastIncludedTerm(),
@@ -558,24 +556,32 @@ public class Follower extends AbstractRaftActorBehavior {
 
                 actor().tell(new ApplySnapshot(snapshot, applySnapshotCallback), actor());
 
-                snapshotTracker = null;
+                closeSnapshotTracker();
             } else {
                 log.debug("{}: handleInstallSnapshot returning: {}", logName(), reply);
 
                 sender.tell(reply, actor());
             }
-        } catch (SnapshotTracker.InvalidChunkException | IOException e) {
+        } catch (IOException e) {
             log.debug("{}: Exception in InstallSnapshot of follower", logName(), e);
 
             sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
                     -1, false), actor());
-            snapshotTracker = null;
 
+            closeSnapshotTracker();
+        }
+    }
+
+    private void closeSnapshotTracker() {
+        if (snapshotTracker != null) {
+            snapshotTracker.close();
+            snapshotTracker = null;
         }
     }
 
     @Override
     public void close() {
+        closeSnapshotTracker();
         stopElection();
     }
 
index e0d76a54a41b2a5bcd2afd14f2e1855b7595b163..5d47dbd02ec9754e94e5bf0f3c81e1a769caf6ba 100644 (file)
@@ -7,7 +7,10 @@
  */
 package org.opendaylight.controller.cluster.raft.behaviors;
 
-import com.google.protobuf.ByteString;
+import com.google.common.base.Throwables;
+import com.google.common.io.ByteSource;
+import java.io.IOException;
+import java.io.InputStream;
 import java.util.Arrays;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -15,7 +18,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Encapsulates the leader state and logic for sending snapshot chunks to a follower.
  */
-public final class LeaderInstallSnapshotState {
+public final class LeaderInstallSnapshotState implements AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(LeaderInstallSnapshotState.class);
 
     // The index of the first chunk that is sent when installing a snapshot
@@ -29,7 +32,7 @@ public final class LeaderInstallSnapshotState {
 
     private final int snapshotChunkSize;
     private final String logName;
-    private ByteString snapshotBytes;
+    private ByteSource snapshotBytes;
     private int offset = 0;
     // the next snapshot chunk is sent only if the replyReceivedForOffset matches offset
     private int replyReceivedForOffset = -1;
@@ -39,26 +42,27 @@ public final class LeaderInstallSnapshotState {
     private int totalChunks;
     private int lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
     private int nextChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
+    private long snapshotSize;
+    private InputStream snapshotInputStream;
 
     LeaderInstallSnapshotState(int snapshotChunkSize, String logName) {
         this.snapshotChunkSize = snapshotChunkSize;
         this.logName = logName;
     }
 
-    ByteString getSnapshotBytes() {
-        return snapshotBytes;
-    }
-
-    void setSnapshotBytes(ByteString snapshotBytes) {
+    void setSnapshotBytes(ByteSource snapshotBytes) throws IOException {
         if (this.snapshotBytes != null) {
             return;
         }
 
+        snapshotSize = snapshotBytes.size();
+        snapshotInputStream = snapshotBytes.openStream();
+
         this.snapshotBytes = snapshotBytes;
-        int size = snapshotBytes.size();
-        totalChunks = size / snapshotChunkSize + (size % snapshotChunkSize > 0 ? 1 : 0);
 
-        LOG.debug("{}: Snapshot {} bytes, total chunks to send: {}", logName, size, totalChunks);
+        totalChunks = (int) (snapshotSize / snapshotChunkSize + (snapshotSize % snapshotChunkSize > 0 ? 1 : 0));
+
+        LOG.debug("{}: Snapshot {} bytes, total chunks to send: {}", logName, snapshotSize, totalChunks);
 
         replyReceivedForOffset = -1;
         chunkIndex = FIRST_CHUNK_INDEX;
@@ -110,22 +114,26 @@ public final class LeaderInstallSnapshotState {
         }
     }
 
-    byte[] getNextChunk() {
-        int snapshotLength = getSnapshotBytes().size();
+    byte[] getNextChunk() throws IOException {
         int start = incrementOffset();
         int size = snapshotChunkSize;
-        if (snapshotChunkSize > snapshotLength) {
-            size = snapshotLength;
-        } else if (start + snapshotChunkSize > snapshotLength) {
-            size = snapshotLength - start;
+        if (snapshotChunkSize > snapshotSize) {
+            size = (int) snapshotSize;
+        } else if (start + snapshotChunkSize > snapshotSize) {
+            size = (int) (snapshotSize - start);
         }
 
         byte[] nextChunk = new byte[size];
-        getSnapshotBytes().copyTo(nextChunk, start, 0, size);
+        int numRead = snapshotInputStream.read(nextChunk);
+        if (numRead != size) {
+            throw new IOException(String.format(
+                    "The # of bytes read from the imput stream, %d, does not match the expected # %d", numRead, size));
+        }
+
         nextChunkHashCode = Arrays.hashCode(nextChunk);
 
         LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName,
-                snapshotLength, start, size, nextChunkHashCode);
+                snapshotSize, start, size, nextChunkHashCode);
         return nextChunk;
     }
 
@@ -133,11 +141,37 @@ public final class LeaderInstallSnapshotState {
      * Reset should be called when the Follower needs to be sent the snapshot from the beginning.
      */
     void reset() {
+        closeStream();
+
         offset = 0;
         replyStatus = false;
         replyReceivedForOffset = offset;
         chunkIndex = FIRST_CHUNK_INDEX;
         lastChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
+
+        try {
+            snapshotInputStream = snapshotBytes.openStream();
+        } catch (IOException e) {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    @Override
+    public void close() {
+        closeStream();
+        snapshotBytes = null;
+    }
+
+    private void closeStream() {
+        if (snapshotInputStream != null) {
+            try {
+                snapshotInputStream.close();
+            } catch (IOException e) {
+                LOG.warn("{}: Error closing snapshot stream", logName);
+            }
+
+            snapshotInputStream = null;
+        }
     }
 
     int getLastChunkHashCode() {
index 3ba020b8143aa69f4c106a5c53f150a117bb1d70..77c9cb5783f4dd59c57246492d02071386107351 100644 (file)
@@ -10,18 +10,22 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.protobuf.ByteString;
+import com.google.common.io.ByteSource;
+import com.google.common.io.CountingOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.util.Arrays;
 import org.slf4j.Logger;
 
 /**
  * Helper class that maintains state for a snapshot that is being installed in chunks on a Follower.
  */
-public class SnapshotTracker {
+class SnapshotTracker implements AutoCloseable {
     private final Logger log;
     private final int totalChunks;
     private final String leaderId;
-    private ByteString collectedChunks = ByteString.EMPTY;
+    private final CountingOutputStream countingStream;
+    private final ByteArrayOutputStream backingStream;
     private int lastChunkIndex = LeaderInstallSnapshotState.FIRST_CHUNK_INDEX - 1;
     private boolean sealed = false;
     private int lastChunkHashCode = LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE;
@@ -30,6 +34,9 @@ public class SnapshotTracker {
         this.log = log;
         this.totalChunks = totalChunks;
         this.leaderId = Preconditions.checkNotNull(leaderId);
+
+        backingStream = new ByteArrayOutputStream();
+        countingStream = new CountingOutputStream(backingStream);
     }
 
     /**
@@ -42,9 +49,9 @@ public class SnapshotTracker {
      * @throws InvalidChunkException if the chunk index is invalid or out of order
      */
     boolean addChunk(int chunkIndex, byte[] chunk, Optional<Integer> maybeLastChunkHashCode)
-            throws InvalidChunkException {
+            throws InvalidChunkException, IOException {
         log.debug("addChunk: chunkIndex={}, lastChunkIndex={}, collectedChunks.size={}, lastChunkHashCode={}",
-                chunkIndex, lastChunkIndex, collectedChunks.size(), this.lastChunkHashCode);
+                chunkIndex, lastChunkIndex, countingStream.getCount(), this.lastChunkHashCode);
 
         if (sealed) {
             throw new InvalidChunkException("Invalid chunk received with chunkIndex " + chunkIndex
@@ -61,30 +68,36 @@ public class SnapshotTracker {
                     + maybeLastChunkHashCode.get());
         }
 
+        countingStream.write(chunk);
+
         sealed = chunkIndex == totalChunks;
         lastChunkIndex = chunkIndex;
-        collectedChunks = collectedChunks.concat(ByteString.copyFrom(chunk));
         this.lastChunkHashCode = Arrays.hashCode(chunk);
         return sealed;
     }
 
-    byte[] getSnapshot() {
+    ByteSource getSnapshotBytes() {
         if (!sealed) {
             throw new IllegalStateException("lastChunk not received yet");
         }
 
-        return collectedChunks.toByteArray();
-    }
-
-    ByteString getCollectedChunks() {
-        return collectedChunks;
+        return ByteSource.wrap(backingStream.toByteArray());
     }
 
     String getLeaderId() {
         return leaderId;
     }
 
-    public static class InvalidChunkException extends Exception {
+    @Override
+    public void close() {
+        try {
+            countingStream.close();
+        } catch (IOException e) {
+            log.warn("Error closing snapshot stream");
+        }
+    }
+
+    public static class InvalidChunkException extends IOException {
         private static final long serialVersionUID = 1L;
 
         InvalidChunkException(String message) {
index 99b647b000f3851e1777396dc56e196430dba562..f8297b0aaae061f45fd66199fda11762ef187f7a 100644 (file)
@@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.io.ByteSource;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -585,7 +586,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
                 -1, null, null), ByteSource.wrap(bs.toByteArray())));
         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
-        fts.setSnapshotBytes(bs);
+        fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
 
         //send first chunk and no InstallSnapshotReply received yet
@@ -924,7 +925,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
                 -1, null, null), ByteSource.wrap(bs.toByteArray())));
         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(
                 actorContext.getConfigParams().getSnapshotChunkSize(), leader.logName());
-        fts.setSnapshotBytes(bs);
+        fts.setSnapshotBytes(ByteSource.wrap(bs.toByteArray()));
         leader.getFollower(FOLLOWER_ID).setLeaderInstallSnapshotState(fts);
         while (!fts.isLastChunk(fts.getChunkIndex())) {
             fts.getNextChunk();
@@ -1156,7 +1157,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
     }
 
     @Test
-    public void testLeaderInstallSnapshotState() {
+    public void testLeaderInstallSnapshotState() throws IOException {
         logStart("testLeaderInstallSnapshotState");
 
         Map<String, String> leadersSnapshot = new HashMap<>();
@@ -1168,7 +1169,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         byte[] barray = bs.toByteArray();
 
         LeaderInstallSnapshotState fts = new LeaderInstallSnapshotState(50, "test");
-        fts.setSnapshotBytes(bs);
+        fts.setSnapshotBytes(ByteSource.wrap(barray));
 
         assertEquals(bs.size(), barray.length);
 
@@ -1192,6 +1193,7 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         }
 
         assertEquals("totalChunks not matching", chunkIndex, fts.getTotalChunks());
+        fts.close();
     }
 
     @Override
index 7591bb51d8da832a291d95ddc06146e9469507fc..281f8071d7cbc8c186ca423b1c4d7f5341b670c0 100644 (file)
@@ -11,14 +11,14 @@ package org.opendaylight.controller.cluster.raft.behaviors;
 import static org.junit.Assert.assertEquals;
 
 import com.google.common.base.Optional;
+import com.google.common.io.ByteSource;
 import com.google.protobuf.ByteString;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.ObjectOutputStream;
+import java.io.Serializable;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
-import org.junit.Assert;
+import org.apache.commons.lang3.SerializationUtils;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -41,117 +41,67 @@ public class SnapshotTrackerTest {
         data.put("key2", "value2");
         data.put("key3", "value3");
 
-        byteString = toByteString(data);
+        byteString = ByteString.copyFrom(SerializationUtils.serialize((Serializable) data));
         chunk1 = getNextChunk(byteString, 0, 10);
         chunk2 = getNextChunk(byteString, 10, 10);
         chunk3 = getNextChunk(byteString, 20, byteString.size());
     }
 
     @Test
-    public void testAddChunk() throws SnapshotTracker.InvalidChunkException {
-        SnapshotTracker tracker1 = new SnapshotTracker(logger, 5, "leader");
+    public void testAddChunks() throws IOException {
+        SnapshotTracker tracker = new SnapshotTracker(logger, 3, "leader");
 
-        tracker1.addChunk(1, chunk1, Optional.<Integer>absent());
-        tracker1.addChunk(2, chunk2, Optional.<Integer>absent());
-        tracker1.addChunk(3, chunk3, Optional.<Integer>absent());
+        tracker.addChunk(1, chunk1, Optional.of(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE));
+        tracker.addChunk(2, chunk2, Optional.of(Arrays.hashCode(chunk1)));
+        tracker.addChunk(3, chunk3, Optional.of(Arrays.hashCode(chunk2)));
 
-        // Verify that an InvalidChunkException is thrown when we try to add a chunk to a sealed tracker
-        SnapshotTracker tracker2 = new SnapshotTracker(logger, 2, "leader");
+        ByteSource snapshotBytes = tracker.getSnapshotBytes();
+        assertEquals("Deserialized", data, SerializationUtils.deserialize(snapshotBytes.read()));
 
-        tracker2.addChunk(1, chunk1, Optional.<Integer>absent());
-        tracker2.addChunk(2, chunk2, Optional.<Integer>absent());
-
-        try {
-            tracker2.addChunk(3, chunk3, Optional.<Integer>absent());
-            Assert.fail();
-        } catch (SnapshotTracker.InvalidChunkException e) {
-            // expected
-        }
-
-        // The first chunk's index must at least be FIRST_CHUNK_INDEX
-        SnapshotTracker tracker3 = new SnapshotTracker(logger, 2, "leader");
+        tracker.close();
+    }
 
-        try {
-            tracker3.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX - 1, chunk1, Optional.<Integer>absent());
-            Assert.fail();
-        } catch (SnapshotTracker.InvalidChunkException e) {
-            // expected
+    @Test(expected = SnapshotTracker.InvalidChunkException.class)
+    public void testAddChunkWhenAlreadySealed() throws IOException {
+        try (SnapshotTracker tracker = new SnapshotTracker(logger, 2, "leader")) {
+            tracker.addChunk(1, chunk1, Optional.<Integer>absent());
+            tracker.addChunk(2, chunk2, Optional.<Integer>absent());
+            tracker.addChunk(3, chunk3, Optional.<Integer>absent());
         }
+    }
 
-        // Out of sequence chunk indexes won't work
-        SnapshotTracker tracker4 = new SnapshotTracker(logger, 2, "leader");
-
-        tracker4.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX, chunk1, Optional.<Integer>absent());
-
-        try {
-            tracker4.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX + 2, chunk2, Optional.<Integer>absent());
-            Assert.fail();
-        } catch (SnapshotTracker.InvalidChunkException e) {
-            // expected
+    @Test(expected = SnapshotTracker.InvalidChunkException.class)
+    public void testInvalidFirstChunkIndex() throws IOException {
+        try (SnapshotTracker tracker = new SnapshotTracker(logger, 2, "leader")) {
+            tracker.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX - 1, chunk1, Optional.<Integer>absent());
         }
+    }
 
-        // No exceptions will be thrown when invalid chunk is added with the right sequence
-        // If the lastChunkHashCode is missing
-        SnapshotTracker tracker5 = new SnapshotTracker(logger, 2, "leader");
-
-        tracker5.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX, chunk1, Optional.<Integer>absent());
-        // Look I can add the same chunk again
-        tracker5.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX + 1, chunk1, Optional.<Integer>absent());
-
-        // An exception will be thrown when an invalid chunk is addedd with the right sequence
-        // when the lastChunkHashCode is present
-        SnapshotTracker tracker6 = new SnapshotTracker(logger, 2, "leader");
-
-        tracker6.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX, chunk1, Optional.of(-1));
-
-        try {
-            // Here we add a second chunk and tell addChunk that the previous chunk had a hash code 777
-            tracker6.addChunk(LeaderInstallSnapshotState.FIRST_CHUNK_INDEX + 1, chunk2, Optional.of(777));
-            Assert.fail();
-        } catch (SnapshotTracker.InvalidChunkException e) {
-            // expected
+    @Test(expected = SnapshotTracker.InvalidChunkException.class)
+    public void testOutOfSequenceChunk() throws IOException {
+        try (SnapshotTracker tracker = new SnapshotTracker(logger, 2, "leader")) {
+            tracker.addChunk(1, chunk1, Optional.<Integer>absent());
+            tracker.addChunk(3, chunk3, Optional.<Integer>absent());
         }
-
     }
 
-    @Test
-    public void testGetSnapShot() throws SnapshotTracker.InvalidChunkException {
-
-        // Trying to get a snapshot before all chunks have been received will throw an exception
-        SnapshotTracker tracker1 = new SnapshotTracker(logger, 5, "leader");
-
-        tracker1.addChunk(1, chunk1, Optional.<Integer>absent());
-        try {
-            tracker1.getSnapshot();
-            Assert.fail();
-        } catch (IllegalStateException e) {
-            // expected
+    @Test(expected = SnapshotTracker.InvalidChunkException.class)
+    public void testInvalidLastChunkHashCode() throws IOException {
+        try (SnapshotTracker tracker = new SnapshotTracker(logger, 2, "leader")) {
+            tracker.addChunk(1, chunk1, Optional.of(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE));
+            tracker.addChunk(2, chunk2, Optional.of(1));
         }
-
-        SnapshotTracker tracker2 = new SnapshotTracker(logger, 3, "leader");
-
-        tracker2.addChunk(1, chunk1, Optional.of(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE));
-        tracker2.addChunk(2, chunk2, Optional.of(Arrays.hashCode(chunk1)));
-        tracker2.addChunk(3, chunk3, Optional.of(Arrays.hashCode(chunk2)));
-
-        byte[] snapshot = tracker2.getSnapshot();
-
-        assertEquals(byteString, ByteString.copyFrom(snapshot));
     }
 
-    @Test
-    public void testGetCollectedChunks() throws SnapshotTracker.InvalidChunkException {
-        SnapshotTracker tracker1 = new SnapshotTracker(logger, 5, "leader");
-
-        ByteString chunks = ByteString.copyFrom(chunk1).concat(ByteString.copyFrom(chunk2));
-
-        tracker1.addChunk(1, chunk1, Optional.of(LeaderInstallSnapshotState.INITIAL_LAST_CHUNK_HASH_CODE));
-        tracker1.addChunk(2, chunk2, Optional.of(Arrays.hashCode(chunk1)));
-
-        assertEquals(chunks, tracker1.getCollectedChunks());
+    @Test(expected = IllegalStateException.class)
+    public void testGetSnapshotBytesWhenNotSealed() throws IOException {
+        try (SnapshotTracker tracker = new SnapshotTracker(logger, 2, "leader")) {
+            tracker.addChunk(1, chunk1, Optional.<Integer>absent());
+            tracker.getSnapshotBytes();
+        }
     }
 
-    public byte[] getNextChunk(ByteString bs, int offset, int size) {
+    private byte[] getNextChunk(ByteString bs, int offset, int size) {
         int snapshotLength = bs.size();
         int start = offset;
         if (size > snapshotLength) {
@@ -166,31 +116,4 @@ public class SnapshotTrackerTest {
         bs.copyTo(nextChunk, start, 0, size);
         return nextChunk;
     }
-
-    private static ByteString toByteString(Map<String, String> state) {
-        ByteArrayOutputStream bos = null;
-        ObjectOutputStream os = null;
-        try {
-            try {
-                bos = new ByteArrayOutputStream();
-                os = new ObjectOutputStream(bos);
-                os.writeObject(state);
-                byte[] snapshotBytes = bos.toByteArray();
-                return ByteString.copyFrom(snapshotBytes);
-            } finally {
-                if (os != null) {
-                    os.flush();
-                    os.close();
-                }
-                if (bos != null) {
-                    bos.close();
-                }
-            }
-        } catch (IOException e) {
-            org.junit.Assert.fail("IOException in converting Hashmap to Bytestring:" + e);
-        }
-        return null;
-    }
-
-
 }