--- /dev/null
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.util.ArrayList;
+import java.util.List;
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.opendaylight.yangtools.concepts.Immutable;
+
+@NonNullByDefault
+final class ChunkedByteArray implements Immutable {
+ private final ImmutableList<byte[]> chunks;
+ private final int size;
+
+ ChunkedByteArray(final int size, final ImmutableList<byte[]> chunks) {
+ this.size = size;
+ this.chunks = requireNonNull(chunks);
+ }
+
+ static ChunkedByteArray readFrom(final ObjectInput in, final int size, final int chunkSize)
+ throws IOException {
+ final List<byte[]> chunks = new ArrayList<>(requiredChunks(size, chunkSize));
+ int remaining = size;
+ do {
+ final byte[] buffer = new byte[Math.min(remaining, chunkSize)];
+ in.readFully(buffer);
+ chunks.add(buffer);
+ remaining -= buffer.length;
+ } while (remaining != 0);
+
+ return new ChunkedByteArray(size, ImmutableList.copyOf(chunks));
+ }
+
+ int size() {
+ return size;
+ }
+
+ ChunkedInputStream openStream() {
+ return new ChunkedInputStream(size, chunks.iterator());
+ }
+
+ void copyTo(final DataOutput output) throws IOException {
+ for (byte[] chunk : chunks) {
+ output.write(chunk, 0, chunk.length);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("size", size).add("chunkCount", chunks.size()).toString();
+ }
+
+ ImmutableList<byte[]> getChunks() {
+ return chunks;
+ }
+
+ private static int requiredChunks(final int size, final int chunkSize) {
+ final int div = size / chunkSize;
+ return size % chunkSize == 0 ? div : div + 1;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.InputStream;
+import java.util.Iterator;
+
+final class ChunkedInputStream extends InputStream {
+ private final Iterator<byte[]> remainingChunks;
+
+ private byte[] currentChunk;
+ private int currentLimit;
+ private int currentOffset;
+ private int available;
+
+ ChunkedInputStream(final int size, final Iterator<byte[]> iterator) {
+ remainingChunks = requireNonNull(iterator);
+ currentChunk = remainingChunks.next();
+ currentLimit = currentChunk.length;
+ available = size;
+ }
+
+ @Override
+ public int available() {
+ return available;
+ }
+
+ @Override
+ public int read() {
+ if (currentChunk == null) {
+ return -1;
+ }
+
+ int ret = currentChunk[currentOffset] & 0xff;
+ consumeBytes(1);
+ return ret;
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:ParameterName")
+ public int read(final byte[] b) {
+ return read(b, 0, b.length);
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:ParameterName")
+ public int read(final byte[] b, final int off, final int len) {
+ if (len < 0) {
+ throw new IndexOutOfBoundsException();
+ }
+ if (currentChunk == null) {
+ return -1;
+ }
+
+ final int result = Math.min(available, len);
+ int toOffset = off;
+ int toCopy = result;
+
+ while (toCopy != 0) {
+ final int count = currentBytes(toCopy);
+ System.arraycopy(currentChunk, currentOffset, b, toOffset, count);
+ consumeBytes(count);
+ toOffset += count;
+ toCopy -= count;
+ }
+
+ return result;
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:ParameterName")
+ public long skip(final long n) {
+ final int result = (int) Math.min(available, n);
+
+ int toSkip = result;
+ while (toSkip != 0) {
+ final int count = currentBytes(toSkip);
+ consumeBytes(count);
+ toSkip -= count;
+ }
+
+ return result;
+ }
+
+ private int currentBytes(final int desired) {
+ return Math.min(desired, currentLimit - currentOffset);
+ }
+
+ private void consumeBytes(final int count) {
+ currentOffset += count;
+ available -= count;
+
+ if (currentOffset == currentLimit) {
+ if (remainingChunks.hasNext()) {
+ currentChunk = remainingChunks.next();
+ currentLimit = currentChunk.length;
+ } else {
+ currentChunk = null;
+ currentLimit = 0;
+ }
+ currentOffset = 0;
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Verify.verify;
+import static com.google.common.math.IntMath.ceilingPowerOfTwo;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableList.Builder;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.Iterator;
+import org.opendaylight.yangtools.concepts.Variant;
+
+/**
+ * An {@link OutputStream} implementation which collects data is a series of {@code byte[]} chunks, each of which has
+ * a fixed maximum size. This is generally preferable to {@link ByteArrayOutputStream}, as that can result in huge
+ * byte arrays -- which can create unnecessary pressure on the GC (as well as lot of copying).
+ *
+ * <p>
+ * This class takes a different approach: it recognizes that result of buffering will be collected at some point, when
+ * the stream is already closed (and thus unmodifiable). Thus it splits the process into two steps:
+ * <ul>
+ * <li>Data acquisition, during which we start with an initial (power-of-two) size and proceed to fill it up. Once the
+ * buffer is full, we stash it, allocate a new buffer twice its size and repeat the process. Once we hit
+ * {@link #MAX_ARRAY_SIZE}, we do not grow subsequent buffer. We also can skip some intermediate sizes if data
+ * is introduced in large chunks via {@link #write(byte[], int, int)}.</li>
+ * <li>Buffer consolidation, which occurs when the stream is {@link #close() closed}. At this point we construct the
+ * final collection of buffers.</li>
+ * </ul>
+ *
+ * <p>
+ * The data acquisition strategy results in predictably-sized buffers, which are growing exponentially in size until
+ * they hit maximum size. Intrinsic property here is that the total capacity of chunks created during the ramp up is
+ * guaranteed to fit into {@code MAX_ARRAY_SIZE}, hence they can readily be compacted into a single buffer, which
+ * replaces them. Combined with the requirement to trim the last buffer to have accurate length, this algorithm
+ * guarantees total number of internal copy operations is capped at {@code 2 * MAX_ARRAY_SIZE}. The number of produced
+ * chunks is also well-controlled:
+ * <ul>
+ * <li>for slowly-built data, we will maintain perfect packing</li>
+ * <li>for fast-startup data, we will be at most one one chunk away from packing perfectly</li>
+ * </ul>
+ *
+ * @author Robert Varga
+ * @author Tomas Olvecky
+ */
+final class ChunkedOutputStream extends OutputStream {
+ static final int MAX_ARRAY_SIZE = ceilingPowerOfTwo(Integer.getInteger(
+ "org.opendaylight.controller.cluster.datastore.persisted.max-array-size", 256 * 1024));
+ private static final int MIN_ARRAY_SIZE = 32;
+
+ // byte[] or a List
+ private Object result;
+ // Lazily-allocated to reduce pressure for single-chunk streams
+ private Deque<byte[]> prevChunks;
+
+ private byte[] currentChunk;
+ private int currentOffset;
+ private int size;
+
+ ChunkedOutputStream(final int requestedInitialCapacity) {
+ currentChunk = new byte[initialCapacity(requestedInitialCapacity)];
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:ParameterName")
+ public void write(final int b) throws IOException {
+ checkNotClosed();
+ ensureOneByte();
+ currentChunk[currentOffset] = (byte) b;
+ currentOffset++;
+ size++;
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:ParameterName")
+ public void write(final byte[] b, final int off, final int len) throws IOException {
+ if (len < 0) {
+ throw new IndexOutOfBoundsException();
+ }
+ checkNotClosed();
+
+ int fromOffset = off;
+ int toCopy = len;
+
+ while (toCopy != 0) {
+ final int count = ensureMoreBytes(toCopy);
+ System.arraycopy(b, fromOffset, currentChunk, currentOffset, count);
+ currentOffset += count;
+ size += count;
+ fromOffset += count;
+ toCopy -= count;
+ }
+ }
+
+ @Override
+ public void close() {
+ if (result == null) {
+ result = computeResult();
+ prevChunks = null;
+ currentChunk = null;
+ }
+ }
+
+ int size() {
+ return size;
+ }
+
+ ChunkedByteArray toChunkedByteArray() {
+ checkClosed();
+ return new ChunkedByteArray(size, result instanceof byte[] ? ImmutableList.of((byte[]) result)
+ : (ImmutableList<byte[]>) result);
+ }
+
+ Variant<byte[], ChunkedByteArray> toVariant() {
+ checkClosed();
+ return result instanceof byte[] ? Variant.ofFirst((byte[]) result)
+ : Variant.ofSecond(new ChunkedByteArray(size, (ImmutableList<byte[]>) result));
+ }
+
+ private Object computeResult() {
+ if (prevChunks == null) {
+ // Simple case: it's only the current buffer, return that
+ return trimChunk(currentChunk, currentOffset);
+ }
+ if (size <= MAX_ARRAY_SIZE) {
+ // We have collected less than full chunk of data, let's have just one chunk ...
+ final byte[] singleChunk;
+ if (currentOffset == 0 && prevChunks.size() == 1) {
+ // ... which we have readily available
+ return prevChunks.getFirst();
+ }
+
+ // ... which we need to collect
+ singleChunk = new byte[size];
+ int offset = 0;
+ for (byte[] chunk : prevChunks) {
+ System.arraycopy(chunk, 0, singleChunk, offset, chunk.length);
+ offset += chunk.length;
+ }
+ System.arraycopy(currentChunk, 0, singleChunk, offset, currentOffset);
+ return singleChunk;
+ }
+
+ // Determine number of chunks to aggregate and their required storage. Normally storage would be MAX_ARRAY_SIZE,
+ // but we can have faster-than-exponential startup, which ends up needing less storage -- and we do not want to
+ // end up trimming this array.
+ int headSize = 0;
+ int headCount = 0;
+ final Iterator<byte[]> it = prevChunks.iterator();
+ do {
+ final byte[] chunk = it.next();
+ if (chunk.length == MAX_ARRAY_SIZE) {
+ break;
+ }
+
+ headSize += chunk.length;
+ headCount++;
+ } while (it.hasNext());
+
+ // Compact initial chunks into a single one
+ final byte[] head = new byte[headSize];
+ int offset = 0;
+ for (int i = 0; i < headCount; ++i) {
+ final byte[] chunk = prevChunks.removeFirst();
+ System.arraycopy(chunk, 0, head, offset, chunk.length);
+ offset += chunk.length;
+ }
+ verify(offset == head.length);
+ prevChunks.addFirst(head);
+
+ // Now append the current chunk if need be, potentially trimming it
+ if (currentOffset == 0) {
+ return ImmutableList.copyOf(prevChunks);
+ }
+
+ final Builder<byte[]> builder = ImmutableList.builderWithExpectedSize(prevChunks.size() + 1);
+ builder.addAll(prevChunks);
+ builder.add(trimChunk(currentChunk, currentOffset));
+ return builder.build();
+ }
+
+ // Ensure a single byte
+ private void ensureOneByte() {
+ if (currentChunk.length == currentOffset) {
+ nextChunk(nextChunkSize(currentChunk.length));
+ }
+ }
+
+ // Ensure more than one byte, returns the number of bytes available
+ private int ensureMoreBytes(final int requested) {
+ int available = currentChunk.length - currentOffset;
+ if (available == 0) {
+ nextChunk(nextChunkSize(currentChunk.length, requested));
+ available = currentChunk.length;
+ }
+ final int count = Math.min(requested, available);
+ verify(count > 0);
+ return count;
+ }
+
+ private void nextChunk(final int chunkSize) {
+ if (prevChunks == null) {
+ prevChunks = new ArrayDeque<>();
+ }
+
+ prevChunks.addLast(currentChunk);
+ currentChunk = new byte[chunkSize];
+ currentOffset = 0;
+ }
+
+ private void checkClosed() {
+ checkState(result != null, "Stream has not been closed yet");
+ }
+
+ private void checkNotClosed() throws IOException {
+ if (result != null) {
+ throw new IOException("Stream is already closed");
+ }
+ }
+
+ private static int nextChunkSize(final int currentSize, final int requested) {
+ return currentSize == MAX_ARRAY_SIZE || requested >= MAX_ARRAY_SIZE
+ ? MAX_ARRAY_SIZE : Math.max(currentSize * 2, ceilingPowerOfTwo(requested));
+ }
+
+ private static int nextChunkSize(final int currentSize) {
+ return currentSize < MAX_ARRAY_SIZE ? currentSize * 2 : MAX_ARRAY_SIZE;
+ }
+
+ private static int initialCapacity(final int requestedSize) {
+ if (requestedSize < MIN_ARRAY_SIZE) {
+ return MIN_ARRAY_SIZE;
+ }
+ if (requestedSize > MAX_ARRAY_SIZE) {
+ return MAX_ARRAY_SIZE;
+ }
+ return ceilingPowerOfTwo(requestedSize);
+ }
+
+ private static byte[] trimChunk(final byte[] chunk, final int length) {
+ return chunk.length == length ? chunk : Arrays.copyOf(chunk, length);
+ }
+}
*/
package org.opendaylight.controller.cluster.datastore.persisted;
+import static com.google.common.base.Verify.verifyNotNull;
import static java.util.Objects.requireNonNull;
+import static org.opendaylight.controller.cluster.datastore.persisted.ChunkedOutputStream.MAX_ARRAY_SIZE;
import com.google.common.annotations.Beta;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
+import java.io.StreamCorruptedException;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.Map.Entry;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.yangtools.concepts.Variant;
import org.opendaylight.yangtools.yang.data.api.schema.stream.ReusableStreamReceiver;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter;
* @author Robert Varga
*/
@Beta
-public final class CommitTransactionPayload extends Payload implements Serializable {
+public abstract class CommitTransactionPayload extends Payload implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(CommitTransactionPayload.class);
-
- private static final class Proxy implements Externalizable {
- private static final long serialVersionUID = 1L;
- private byte[] serialized;
-
- // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
- // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
- @SuppressWarnings("checkstyle:RedundantModifier")
- public Proxy() {
- // For Externalizable
- }
-
- Proxy(final byte[] serialized) {
- this.serialized = requireNonNull(serialized);
- }
-
- @Override
- public void writeExternal(final ObjectOutput out) throws IOException {
- out.writeInt(serialized.length);
- out.write(serialized);
- }
-
- @Override
- public void readExternal(final ObjectInput in) throws IOException {
- final int length = in.readInt();
- serialized = new byte[length];
- in.readFully(serialized);
- }
-
- private Object readResolve() {
- return new CommitTransactionPayload(serialized);
- }
- }
-
private static final long serialVersionUID = 1L;
- private final byte[] serialized;
+ CommitTransactionPayload() {
- CommitTransactionPayload(final byte[] serialized) {
- this.serialized = requireNonNull(serialized);
}
public static CommitTransactionPayload create(final TransactionIdentifier transactionId,
final DataTreeCandidate candidate, final int initialSerializedBufferCapacity) throws IOException {
- final ByteArrayDataOutput out = ByteStreams.newDataOutput(initialSerializedBufferCapacity);
- transactionId.writeTo(out);
- DataTreeCandidateInputOutput.writeDataTreeCandidate(out, candidate);
- final byte[] serialized = out.toByteArray();
- LOG.debug("Initial buffer capacity {}, actual serialized size {}",
- initialSerializedBufferCapacity, serialized.length);
+ final ChunkedOutputStream cos = new ChunkedOutputStream(initialSerializedBufferCapacity);
+ try (DataOutputStream dos = new DataOutputStream(cos)) {
+ transactionId.writeTo(dos);
+ DataTreeCandidateInputOutput.writeDataTreeCandidate(dos, candidate);
+ }
- return new CommitTransactionPayload(serialized);
+ final Variant<byte[], ChunkedByteArray> source = cos.toVariant();
+ LOG.debug("Initial buffer capacity {}, actual serialized size {}", initialSerializedBufferCapacity, cos.size());
+ return source.isFirst() ? new Simple(source.getFirst()) : new Chunked(source.getSecond());
}
@VisibleForTesting
return getCandidate(ReusableImmutableNormalizedNodeStreamWriter.create());
}
- public Entry<TransactionIdentifier, DataTreeCandidate> getCandidate(final ReusableStreamReceiver receiver)
- throws IOException {
- final DataInput in = ByteStreams.newDataInput(serialized);
+ public final Entry<TransactionIdentifier, DataTreeCandidate> getCandidate(
+ final ReusableStreamReceiver receiver) throws IOException {
+ final DataInput in = newDataInput();
return new SimpleImmutableEntry<>(TransactionIdentifier.readFrom(in),
DataTreeCandidateInputOutput.readDataTreeCandidate(in, receiver));
}
- @Override
- public int size() {
- return serialized.length;
+ abstract void writeBytes(ObjectOutput out) throws IOException;
+
+ abstract DataInput newDataInput();
+
+ final Object writeReplace() {
+ return new Proxy(this);
}
- private Object writeReplace() {
- return new Proxy(serialized);
+ private static final class Simple extends CommitTransactionPayload {
+ private static final long serialVersionUID = 1L;
+
+ private final byte[] serialized;
+
+ Simple(final byte[] serialized) {
+ this.serialized = requireNonNull(serialized);
+ }
+
+ @Override
+ public int size() {
+ return serialized.length;
+ }
+
+ @Override
+ DataInput newDataInput() {
+ return ByteStreams.newDataInput(serialized);
+ }
+
+ @Override
+ void writeBytes(final ObjectOutput out) throws IOException {
+ out.write(serialized);
+ }
+ }
+
+ private static final class Chunked extends CommitTransactionPayload {
+ private static final long serialVersionUID = 1L;
+
+ @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "Handled via serialization proxy")
+ private final ChunkedByteArray source;
+
+ Chunked(final ChunkedByteArray source) {
+ this.source = requireNonNull(source);
+ }
+
+ @Override
+ void writeBytes(final ObjectOutput out) throws IOException {
+ source.copyTo(out);
+ }
+
+ @Override
+ public int size() {
+ return source.size();
+ }
+
+ @Override
+ DataInput newDataInput() {
+ return new DataInputStream(source.openStream());
+ }
+ }
+
+ private static final class Proxy implements Externalizable {
+ private static final long serialVersionUID = 1L;
+
+ private CommitTransactionPayload payload;
+
+ // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+ // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
+ public Proxy() {
+ // For Externalizable
+ }
+
+ Proxy(final CommitTransactionPayload payload) {
+ this.payload = requireNonNull(payload);
+ }
+
+ @Override
+ public void writeExternal(final ObjectOutput out) throws IOException {
+ out.writeInt(payload.size());
+ payload.writeBytes(out);
+ }
+
+ @Override
+ public void readExternal(final ObjectInput in) throws IOException {
+ final int length = in.readInt();
+ if (length < 0) {
+ throw new StreamCorruptedException("Invalid payload length " + length);
+ } else if (length < MAX_ARRAY_SIZE) {
+ final byte[] serialized = new byte[length];
+ in.readFully(serialized);
+ payload = new Simple(serialized);
+ } else {
+ payload = new Chunked(ChunkedByteArray.readFrom(in, length, MAX_ARRAY_SIZE));
+ }
+ }
+
+ private Object readResolve() {
+ return verifyNotNull(payload);
+ }
}
}
--- /dev/null
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.List;
+import org.junit.Test;
+
+public class ChunkedOutputStreamTest {
+ private static final int INITIAL_SIZE = 256;
+
+ private final ChunkedOutputStream stream = new ChunkedOutputStream(INITIAL_SIZE);
+
+ @Test
+ public void testBasicWrite() throws IOException {
+ for (int i = 0; i < INITIAL_SIZE; ++i) {
+ stream.write(i);
+ }
+
+ final byte[] chunk = assertFinishedStream(INITIAL_SIZE, 1).get(0);
+ assertEquals(INITIAL_SIZE, chunk.length);
+ for (int i = 0; i < INITIAL_SIZE; ++i) {
+ assertEquals((byte) i, chunk[i]);
+ }
+ }
+
+ @Test
+ public void testBasicLargeWrite() throws IOException {
+ final byte[] array = createArray(INITIAL_SIZE);
+ stream.write(array);
+ final byte[] chunk = assertFinishedStream(INITIAL_SIZE, 1).get(0);
+ assertArrayEquals(array, chunk);
+ }
+
+ @Test
+ public void testGrowWrite() throws IOException {
+ for (int i = 0; i < INITIAL_SIZE * 2; ++i) {
+ stream.write(i);
+ }
+
+ final byte[] chunk = assertFinishedStream(INITIAL_SIZE * 2, 1).get(0);
+ assertEquals(INITIAL_SIZE * 2, chunk.length);
+ for (int i = 0; i < INITIAL_SIZE * 2; ++i) {
+ assertEquals((byte) i, chunk[i]);
+ }
+ }
+
+ @Test
+ public void testGrowLargeWrite() throws IOException {
+ final byte[] array = createArray(INITIAL_SIZE * 2);
+ stream.write(array);
+ final byte[] chunk = assertFinishedStream(INITIAL_SIZE * 2, 1).get(0);
+ assertArrayEquals(array, chunk);
+ }
+
+ @Test
+ public void testTwoChunksWrite() throws IOException {
+ int size = ChunkedOutputStream.MAX_ARRAY_SIZE + 1;
+ for (int i = 0; i < size; ++i) {
+ stream.write(i);
+ }
+
+ int counter = 0;
+ for (byte[] chunk: assertFinishedStream(size, 2)) {
+ for (byte actual: chunk) {
+ assertEquals((byte) counter++, actual);
+ }
+ }
+ }
+
+ private List<byte[]> assertFinishedStream(final int expectedSize, final int expectedChunks) {
+ stream.close();
+ final ChunkedByteArray array = stream.toChunkedByteArray();
+ assertEquals(expectedSize, array.size());
+
+ final List<byte[]> chunks = array.getChunks();
+ assertEquals(expectedChunks, chunks.size());
+ return chunks;
+ }
+
+ private static byte[] createArray(final int size) {
+ final byte[] array = new byte[size];
+ for (int i = 0; i < size; ++i) {
+ array[i] = (byte) i;
+ }
+ return array;
+ }
+}