From: Robert Varga Date: Thu, 19 Sep 2019 23:50:56 +0000 (+0200) Subject: Split up transaction chunks X-Git-Tag: release/magnesium~58 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=afe114674227071a2598dd3a3f6589a99573e075;hp=9112031a13f4bc73ea5504da290ddeca0f7d2c17 Split up transaction chunks This adds the infrastructure to prevent allocation of large byte[]s, as when those exceed 0.5-16MiB, under G1GC they end up in humongous object region. Not only that, but after some cut-off point, the copying of arrays starts to dominate performance. What we do here is ensure we always receive up to a configurable number of bytes, defaults to 256KiB, and keep those chunks in a list. This way we may end up with larger overhead, but that really is neglibeble -- even a 2GiB payload would end up using only about 8K arrays. While the input/output streams are similar to org.apache.commons.io.output.ByteArrayOutputStream, the design here is geared towards having the intermediate representation available as well as devolving to a single byte[] for memory efficiency reasons. JIRA: CONTROLLER-1920 Change-Id: I2b79a633ebf4fdf8d68d2accc644326e30b41f22 Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedByteArray.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedByteArray.java new file mode 100644 index 0000000000..73a7df9542 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedByteArray.java @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.persisted; + +import static java.util.Objects.requireNonNull; + +import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableList; +import java.io.DataOutput; +import java.io.IOException; +import java.io.ObjectInput; +import java.util.ArrayList; +import java.util.List; +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.opendaylight.yangtools.concepts.Immutable; + +@NonNullByDefault +final class ChunkedByteArray implements Immutable { + private final ImmutableList chunks; + private final int size; + + ChunkedByteArray(final int size, final ImmutableList chunks) { + this.size = size; + this.chunks = requireNonNull(chunks); + } + + static ChunkedByteArray readFrom(final ObjectInput in, final int size, final int chunkSize) + throws IOException { + final List chunks = new ArrayList<>(requiredChunks(size, chunkSize)); + int remaining = size; + do { + final byte[] buffer = new byte[Math.min(remaining, chunkSize)]; + in.readFully(buffer); + chunks.add(buffer); + remaining -= buffer.length; + } while (remaining != 0); + + return new ChunkedByteArray(size, ImmutableList.copyOf(chunks)); + } + + int size() { + return size; + } + + ChunkedInputStream openStream() { + return new ChunkedInputStream(size, chunks.iterator()); + } + + void copyTo(final DataOutput output) throws IOException { + for (byte[] chunk : chunks) { + output.write(chunk, 0, chunk.length); + } + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("size", size).add("chunkCount", chunks.size()).toString(); + } + + ImmutableList getChunks() { + return chunks; + } + + private static int requiredChunks(final int size, final int chunkSize) { + final int div = size / chunkSize; + return size % chunkSize == 0 ? div : div + 1; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedInputStream.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedInputStream.java new file mode 100644 index 0000000000..68a906ed54 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedInputStream.java @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.persisted; + +import static java.util.Objects.requireNonNull; + +import java.io.InputStream; +import java.util.Iterator; + +final class ChunkedInputStream extends InputStream { + private final Iterator remainingChunks; + + private byte[] currentChunk; + private int currentLimit; + private int currentOffset; + private int available; + + ChunkedInputStream(final int size, final Iterator iterator) { + remainingChunks = requireNonNull(iterator); + currentChunk = remainingChunks.next(); + currentLimit = currentChunk.length; + available = size; + } + + @Override + public int available() { + return available; + } + + @Override + public int read() { + if (currentChunk == null) { + return -1; + } + + int ret = currentChunk[currentOffset] & 0xff; + consumeBytes(1); + return ret; + } + + @Override + @SuppressWarnings("checkstyle:ParameterName") + public int read(final byte[] b) { + return read(b, 0, b.length); + } + + @Override + @SuppressWarnings("checkstyle:ParameterName") + public int read(final byte[] b, final int off, final int len) { + if (len < 0) { + throw new IndexOutOfBoundsException(); + } + if (currentChunk == null) { + return -1; + } + + final int result = Math.min(available, len); + int toOffset = off; + int toCopy = result; + + while (toCopy != 0) { + final int count = currentBytes(toCopy); + System.arraycopy(currentChunk, currentOffset, b, toOffset, count); + consumeBytes(count); + toOffset += count; + toCopy -= count; + } + + return result; + } + + @Override + @SuppressWarnings("checkstyle:ParameterName") + public long skip(final long n) { + final int result = (int) Math.min(available, n); + + int toSkip = result; + while (toSkip != 0) { + final int count = currentBytes(toSkip); + consumeBytes(count); + toSkip -= count; + } + + return result; + } + + private int currentBytes(final int desired) { + return Math.min(desired, currentLimit - currentOffset); + } + + private void consumeBytes(final int count) { + currentOffset += count; + available -= count; + + if (currentOffset == currentLimit) { + if (remainingChunks.hasNext()) { + currentChunk = remainingChunks.next(); + currentLimit = currentChunk.length; + } else { + currentChunk = null; + currentLimit = 0; + } + currentOffset = 0; + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedOutputStream.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedOutputStream.java new file mode 100644 index 0000000000..717c3d1bd9 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedOutputStream.java @@ -0,0 +1,254 @@ +/* + * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.persisted; + +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; +import static com.google.common.math.IntMath.ceilingPowerOfTwo; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableList.Builder; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Deque; +import java.util.Iterator; +import org.opendaylight.yangtools.concepts.Variant; + +/** + * An {@link OutputStream} implementation which collects data is a series of {@code byte[]} chunks, each of which has + * a fixed maximum size. This is generally preferable to {@link ByteArrayOutputStream}, as that can result in huge + * byte arrays -- which can create unnecessary pressure on the GC (as well as lot of copying). + * + *

+ * This class takes a different approach: it recognizes that result of buffering will be collected at some point, when + * the stream is already closed (and thus unmodifiable). Thus it splits the process into two steps: + *

    + *
  • Data acquisition, during which we start with an initial (power-of-two) size and proceed to fill it up. Once the + * buffer is full, we stash it, allocate a new buffer twice its size and repeat the process. Once we hit + * {@link #MAX_ARRAY_SIZE}, we do not grow subsequent buffer. We also can skip some intermediate sizes if data + * is introduced in large chunks via {@link #write(byte[], int, int)}.
  • + *
  • Buffer consolidation, which occurs when the stream is {@link #close() closed}. At this point we construct the + * final collection of buffers.
  • + *
+ * + *

+ * The data acquisition strategy results in predictably-sized buffers, which are growing exponentially in size until + * they hit maximum size. Intrinsic property here is that the total capacity of chunks created during the ramp up is + * guaranteed to fit into {@code MAX_ARRAY_SIZE}, hence they can readily be compacted into a single buffer, which + * replaces them. Combined with the requirement to trim the last buffer to have accurate length, this algorithm + * guarantees total number of internal copy operations is capped at {@code 2 * MAX_ARRAY_SIZE}. The number of produced + * chunks is also well-controlled: + *

    + *
  • for slowly-built data, we will maintain perfect packing
  • + *
  • for fast-startup data, we will be at most one one chunk away from packing perfectly
  • + *
+ * + * @author Robert Varga + * @author Tomas Olvecky + */ +final class ChunkedOutputStream extends OutputStream { + static final int MAX_ARRAY_SIZE = ceilingPowerOfTwo(Integer.getInteger( + "org.opendaylight.controller.cluster.datastore.persisted.max-array-size", 256 * 1024)); + private static final int MIN_ARRAY_SIZE = 32; + + // byte[] or a List + private Object result; + // Lazily-allocated to reduce pressure for single-chunk streams + private Deque prevChunks; + + private byte[] currentChunk; + private int currentOffset; + private int size; + + ChunkedOutputStream(final int requestedInitialCapacity) { + currentChunk = new byte[initialCapacity(requestedInitialCapacity)]; + } + + @Override + @SuppressWarnings("checkstyle:ParameterName") + public void write(final int b) throws IOException { + checkNotClosed(); + ensureOneByte(); + currentChunk[currentOffset] = (byte) b; + currentOffset++; + size++; + } + + @Override + @SuppressWarnings("checkstyle:ParameterName") + public void write(final byte[] b, final int off, final int len) throws IOException { + if (len < 0) { + throw new IndexOutOfBoundsException(); + } + checkNotClosed(); + + int fromOffset = off; + int toCopy = len; + + while (toCopy != 0) { + final int count = ensureMoreBytes(toCopy); + System.arraycopy(b, fromOffset, currentChunk, currentOffset, count); + currentOffset += count; + size += count; + fromOffset += count; + toCopy -= count; + } + } + + @Override + public void close() { + if (result == null) { + result = computeResult(); + prevChunks = null; + currentChunk = null; + } + } + + int size() { + return size; + } + + ChunkedByteArray toChunkedByteArray() { + checkClosed(); + return new ChunkedByteArray(size, result instanceof byte[] ? ImmutableList.of((byte[]) result) + : (ImmutableList) result); + } + + Variant toVariant() { + checkClosed(); + return result instanceof byte[] ? Variant.ofFirst((byte[]) result) + : Variant.ofSecond(new ChunkedByteArray(size, (ImmutableList) result)); + } + + private Object computeResult() { + if (prevChunks == null) { + // Simple case: it's only the current buffer, return that + return trimChunk(currentChunk, currentOffset); + } + if (size <= MAX_ARRAY_SIZE) { + // We have collected less than full chunk of data, let's have just one chunk ... + final byte[] singleChunk; + if (currentOffset == 0 && prevChunks.size() == 1) { + // ... which we have readily available + return prevChunks.getFirst(); + } + + // ... which we need to collect + singleChunk = new byte[size]; + int offset = 0; + for (byte[] chunk : prevChunks) { + System.arraycopy(chunk, 0, singleChunk, offset, chunk.length); + offset += chunk.length; + } + System.arraycopy(currentChunk, 0, singleChunk, offset, currentOffset); + return singleChunk; + } + + // Determine number of chunks to aggregate and their required storage. Normally storage would be MAX_ARRAY_SIZE, + // but we can have faster-than-exponential startup, which ends up needing less storage -- and we do not want to + // end up trimming this array. + int headSize = 0; + int headCount = 0; + final Iterator it = prevChunks.iterator(); + do { + final byte[] chunk = it.next(); + if (chunk.length == MAX_ARRAY_SIZE) { + break; + } + + headSize += chunk.length; + headCount++; + } while (it.hasNext()); + + // Compact initial chunks into a single one + final byte[] head = new byte[headSize]; + int offset = 0; + for (int i = 0; i < headCount; ++i) { + final byte[] chunk = prevChunks.removeFirst(); + System.arraycopy(chunk, 0, head, offset, chunk.length); + offset += chunk.length; + } + verify(offset == head.length); + prevChunks.addFirst(head); + + // Now append the current chunk if need be, potentially trimming it + if (currentOffset == 0) { + return ImmutableList.copyOf(prevChunks); + } + + final Builder builder = ImmutableList.builderWithExpectedSize(prevChunks.size() + 1); + builder.addAll(prevChunks); + builder.add(trimChunk(currentChunk, currentOffset)); + return builder.build(); + } + + // Ensure a single byte + private void ensureOneByte() { + if (currentChunk.length == currentOffset) { + nextChunk(nextChunkSize(currentChunk.length)); + } + } + + // Ensure more than one byte, returns the number of bytes available + private int ensureMoreBytes(final int requested) { + int available = currentChunk.length - currentOffset; + if (available == 0) { + nextChunk(nextChunkSize(currentChunk.length, requested)); + available = currentChunk.length; + } + final int count = Math.min(requested, available); + verify(count > 0); + return count; + } + + private void nextChunk(final int chunkSize) { + if (prevChunks == null) { + prevChunks = new ArrayDeque<>(); + } + + prevChunks.addLast(currentChunk); + currentChunk = new byte[chunkSize]; + currentOffset = 0; + } + + private void checkClosed() { + checkState(result != null, "Stream has not been closed yet"); + } + + private void checkNotClosed() throws IOException { + if (result != null) { + throw new IOException("Stream is already closed"); + } + } + + private static int nextChunkSize(final int currentSize, final int requested) { + return currentSize == MAX_ARRAY_SIZE || requested >= MAX_ARRAY_SIZE + ? MAX_ARRAY_SIZE : Math.max(currentSize * 2, ceilingPowerOfTwo(requested)); + } + + private static int nextChunkSize(final int currentSize) { + return currentSize < MAX_ARRAY_SIZE ? currentSize * 2 : MAX_ARRAY_SIZE; + } + + private static int initialCapacity(final int requestedSize) { + if (requestedSize < MIN_ARRAY_SIZE) { + return MIN_ARRAY_SIZE; + } + if (requestedSize > MAX_ARRAY_SIZE) { + return MAX_ARRAY_SIZE; + } + return ceilingPowerOfTwo(requestedSize); + } + + private static byte[] trimChunk(final byte[] chunk, final int length) { + return chunk.length == length ? chunk : Arrays.copyOf(chunk, length); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java index fb66581c28..73bdd6f31b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java @@ -7,22 +7,28 @@ */ package org.opendaylight.controller.cluster.datastore.persisted; +import static com.google.common.base.Verify.verifyNotNull; import static java.util.Objects.requireNonNull; +import static org.opendaylight.controller.cluster.datastore.persisted.ChunkedOutputStream.MAX_ARRAY_SIZE; import com.google.common.annotations.Beta; import com.google.common.annotations.VisibleForTesting; -import com.google.common.io.ByteArrayDataOutput; import com.google.common.io.ByteStreams; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.io.Serializable; +import java.io.StreamCorruptedException; import java.util.AbstractMap.SimpleImmutableEntry; import java.util.Map.Entry; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.yangtools.concepts.Variant; import org.opendaylight.yangtools.yang.data.api.schema.stream.ReusableStreamReceiver; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter; @@ -36,61 +42,26 @@ import org.slf4j.LoggerFactory; * @author Robert Varga */ @Beta -public final class CommitTransactionPayload extends Payload implements Serializable { +public abstract class CommitTransactionPayload extends Payload implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(CommitTransactionPayload.class); - - private static final class Proxy implements Externalizable { - private static final long serialVersionUID = 1L; - private byte[] serialized; - - // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't - // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. - @SuppressWarnings("checkstyle:RedundantModifier") - public Proxy() { - // For Externalizable - } - - Proxy(final byte[] serialized) { - this.serialized = requireNonNull(serialized); - } - - @Override - public void writeExternal(final ObjectOutput out) throws IOException { - out.writeInt(serialized.length); - out.write(serialized); - } - - @Override - public void readExternal(final ObjectInput in) throws IOException { - final int length = in.readInt(); - serialized = new byte[length]; - in.readFully(serialized); - } - - private Object readResolve() { - return new CommitTransactionPayload(serialized); - } - } - private static final long serialVersionUID = 1L; - private final byte[] serialized; + CommitTransactionPayload() { - CommitTransactionPayload(final byte[] serialized) { - this.serialized = requireNonNull(serialized); } public static CommitTransactionPayload create(final TransactionIdentifier transactionId, final DataTreeCandidate candidate, final int initialSerializedBufferCapacity) throws IOException { - final ByteArrayDataOutput out = ByteStreams.newDataOutput(initialSerializedBufferCapacity); - transactionId.writeTo(out); - DataTreeCandidateInputOutput.writeDataTreeCandidate(out, candidate); - final byte[] serialized = out.toByteArray(); - LOG.debug("Initial buffer capacity {}, actual serialized size {}", - initialSerializedBufferCapacity, serialized.length); + final ChunkedOutputStream cos = new ChunkedOutputStream(initialSerializedBufferCapacity); + try (DataOutputStream dos = new DataOutputStream(cos)) { + transactionId.writeTo(dos); + DataTreeCandidateInputOutput.writeDataTreeCandidate(dos, candidate); + } - return new CommitTransactionPayload(serialized); + final Variant source = cos.toVariant(); + LOG.debug("Initial buffer capacity {}, actual serialized size {}", initialSerializedBufferCapacity, cos.size()); + return source.isFirst() ? new Simple(source.getFirst()) : new Chunked(source.getSecond()); } @VisibleForTesting @@ -103,19 +74,110 @@ public final class CommitTransactionPayload extends Payload implements Serializa return getCandidate(ReusableImmutableNormalizedNodeStreamWriter.create()); } - public Entry getCandidate(final ReusableStreamReceiver receiver) - throws IOException { - final DataInput in = ByteStreams.newDataInput(serialized); + public final Entry getCandidate( + final ReusableStreamReceiver receiver) throws IOException { + final DataInput in = newDataInput(); return new SimpleImmutableEntry<>(TransactionIdentifier.readFrom(in), DataTreeCandidateInputOutput.readDataTreeCandidate(in, receiver)); } - @Override - public int size() { - return serialized.length; + abstract void writeBytes(ObjectOutput out) throws IOException; + + abstract DataInput newDataInput(); + + final Object writeReplace() { + return new Proxy(this); } - private Object writeReplace() { - return new Proxy(serialized); + private static final class Simple extends CommitTransactionPayload { + private static final long serialVersionUID = 1L; + + private final byte[] serialized; + + Simple(final byte[] serialized) { + this.serialized = requireNonNull(serialized); + } + + @Override + public int size() { + return serialized.length; + } + + @Override + DataInput newDataInput() { + return ByteStreams.newDataInput(serialized); + } + + @Override + void writeBytes(final ObjectOutput out) throws IOException { + out.write(serialized); + } + } + + private static final class Chunked extends CommitTransactionPayload { + private static final long serialVersionUID = 1L; + + @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "Handled via serialization proxy") + private final ChunkedByteArray source; + + Chunked(final ChunkedByteArray source) { + this.source = requireNonNull(source); + } + + @Override + void writeBytes(final ObjectOutput out) throws IOException { + source.copyTo(out); + } + + @Override + public int size() { + return source.size(); + } + + @Override + DataInput newDataInput() { + return new DataInputStream(source.openStream()); + } + } + + private static final class Proxy implements Externalizable { + private static final long serialVersionUID = 1L; + + private CommitTransactionPayload payload; + + // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't + // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") + public Proxy() { + // For Externalizable + } + + Proxy(final CommitTransactionPayload payload) { + this.payload = requireNonNull(payload); + } + + @Override + public void writeExternal(final ObjectOutput out) throws IOException { + out.writeInt(payload.size()); + payload.writeBytes(out); + } + + @Override + public void readExternal(final ObjectInput in) throws IOException { + final int length = in.readInt(); + if (length < 0) { + throw new StreamCorruptedException("Invalid payload length " + length); + } else if (length < MAX_ARRAY_SIZE) { + final byte[] serialized = new byte[length]; + in.readFully(serialized); + payload = new Simple(serialized); + } else { + payload = new Chunked(ChunkedByteArray.readFrom(in, length, MAX_ARRAY_SIZE)); + } + } + + private Object readResolve() { + return verifyNotNull(payload); + } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedOutputStreamTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedOutputStreamTest.java new file mode 100644 index 0000000000..16f7bd830b --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedOutputStreamTest.java @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.persisted; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.List; +import org.junit.Test; + +public class ChunkedOutputStreamTest { + private static final int INITIAL_SIZE = 256; + + private final ChunkedOutputStream stream = new ChunkedOutputStream(INITIAL_SIZE); + + @Test + public void testBasicWrite() throws IOException { + for (int i = 0; i < INITIAL_SIZE; ++i) { + stream.write(i); + } + + final byte[] chunk = assertFinishedStream(INITIAL_SIZE, 1).get(0); + assertEquals(INITIAL_SIZE, chunk.length); + for (int i = 0; i < INITIAL_SIZE; ++i) { + assertEquals((byte) i, chunk[i]); + } + } + + @Test + public void testBasicLargeWrite() throws IOException { + final byte[] array = createArray(INITIAL_SIZE); + stream.write(array); + final byte[] chunk = assertFinishedStream(INITIAL_SIZE, 1).get(0); + assertArrayEquals(array, chunk); + } + + @Test + public void testGrowWrite() throws IOException { + for (int i = 0; i < INITIAL_SIZE * 2; ++i) { + stream.write(i); + } + + final byte[] chunk = assertFinishedStream(INITIAL_SIZE * 2, 1).get(0); + assertEquals(INITIAL_SIZE * 2, chunk.length); + for (int i = 0; i < INITIAL_SIZE * 2; ++i) { + assertEquals((byte) i, chunk[i]); + } + } + + @Test + public void testGrowLargeWrite() throws IOException { + final byte[] array = createArray(INITIAL_SIZE * 2); + stream.write(array); + final byte[] chunk = assertFinishedStream(INITIAL_SIZE * 2, 1).get(0); + assertArrayEquals(array, chunk); + } + + @Test + public void testTwoChunksWrite() throws IOException { + int size = ChunkedOutputStream.MAX_ARRAY_SIZE + 1; + for (int i = 0; i < size; ++i) { + stream.write(i); + } + + int counter = 0; + for (byte[] chunk: assertFinishedStream(size, 2)) { + for (byte actual: chunk) { + assertEquals((byte) counter++, actual); + } + } + } + + private List assertFinishedStream(final int expectedSize, final int expectedChunks) { + stream.close(); + final ChunkedByteArray array = stream.toChunkedByteArray(); + assertEquals(expectedSize, array.size()); + + final List chunks = array.getChunks(); + assertEquals(expectedChunks, chunks.size()); + return chunks; + } + + private static byte[] createArray(final int size) { + final byte[] array = new byte[size]; + for (int i = 0; i < size; ++i) { + array[i] = (byte) i; + } + return array; + } +}