2 * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.io;
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12 import static com.google.common.base.Verify.verify;
13 import static com.google.common.math.IntMath.ceilingPowerOfTwo;
14 import static com.google.common.math.IntMath.isPowerOfTwo;
16 import com.google.common.annotations.Beta;
17 import com.google.common.annotations.VisibleForTesting;
18 import com.google.common.collect.ImmutableList;
19 import com.google.common.collect.ImmutableList.Builder;
20 import java.io.ByteArrayOutputStream;
21 import java.io.IOException;
22 import java.io.OutputStream;
23 import java.util.ArrayDeque;
24 import java.util.Arrays;
25 import java.util.Deque;
26 import java.util.Iterator;
27 import org.opendaylight.yangtools.concepts.Variant;
30 * An {@link OutputStream} implementation which collects data is a series of {@code byte[]} chunks, each of which has
31 * a fixed maximum size. This is generally preferable to {@link ByteArrayOutputStream}, as that can result in huge
32 * byte arrays -- which can create unnecessary pressure on the GC (as well as lot of copying).
35 * This class takes a different approach: it recognizes that result of buffering will be collected at some point, when
36 * the stream is already closed (and thus unmodifiable). Thus it splits the process into two steps:
38 * <li>Data acquisition, during which we start with an initial (power-of-two) size and proceed to fill it up. Once the
39 * buffer is full, we stash it, allocate a new buffer twice its size and repeat the process. Once we hit
40 * {@code maxChunkSize}, we do not grow subsequent buffer. We also can skip some intermediate sizes if data
41 * is introduced in large chunks via {@link #write(byte[], int, int)}.</li>
42 * <li>Buffer consolidation, which occurs when the stream is {@link #close() closed}. At this point we construct the
43 * final collection of buffers.</li>
47 * The data acquisition strategy results in predictably-sized buffers, which are growing exponentially in size until
48 * they hit maximum size. Intrinsic property here is that the total capacity of chunks created during the ramp up is
49 * guaranteed to fit into {@code maxChunkSize}, hence they can readily be compacted into a single buffer, which replaces
50 * them. Combined with the requirement to trim the last buffer to have accurate length, this algorithm guarantees total
51 * number of internal copy operations is capped at {@code 2 * maxChunkSize}. The number of produced chunks is also
54 * <li>for slowly-built data, we will maintain perfect packing</li>
55 * <li>for fast-startup data, we will be at most one one chunk away from packing perfectly</li>
58 * @author Robert Varga
59 * @author Tomas Olvecky
62 public final class ChunkedOutputStream extends OutputStream {
63 private static final int MIN_ARRAY_SIZE = 32;
65 private final int maxChunkSize;
68 private Object result;
69 // Lazily-allocated to reduce pressure for single-chunk streams
70 private Deque<byte[]> prevChunks;
72 private byte[] currentChunk;
73 private int currentOffset;
76 public ChunkedOutputStream(final int requestedInitialCapacity, final int maxChunkSize) {
77 checkArgument(isPowerOfTwo(maxChunkSize), "Maximum chunk size %s is not a power of two", maxChunkSize);
78 checkArgument(maxChunkSize > 0, "Maximum chunk size %s is not positive", maxChunkSize);
79 this.maxChunkSize = maxChunkSize;
80 currentChunk = new byte[initialCapacity(requestedInitialCapacity, maxChunkSize)];
84 @SuppressWarnings("checkstyle:ParameterName")
85 public void write(final int b) throws IOException {
88 currentChunk[currentOffset] = (byte) b;
94 @SuppressWarnings("checkstyle:ParameterName")
95 public void write(final byte[] b, final int off, final int len) throws IOException {
97 throw new IndexOutOfBoundsException();
101 int fromOffset = off;
104 while (toCopy != 0) {
105 final int count = ensureMoreBytes(toCopy);
106 System.arraycopy(b, fromOffset, currentChunk, currentOffset, count);
107 currentOffset += count;
115 public void close() {
116 if (result == null) {
117 result = computeResult();
127 public Variant<byte[], ChunkedByteArray> toVariant() {
129 return result instanceof byte[] ? Variant.ofFirst((byte[]) result)
130 : Variant.ofSecond(new ChunkedByteArray(size, (ImmutableList<byte[]>) result));
134 ChunkedByteArray toChunkedByteArray() {
136 return new ChunkedByteArray(size, result instanceof byte[] ? ImmutableList.of((byte[]) result)
137 : (ImmutableList<byte[]>) result);
140 private Object computeResult() {
141 if (prevChunks == null) {
142 // Simple case: it's only the current buffer, return that
143 return trimChunk(currentChunk, currentOffset);
145 if (size <= maxChunkSize) {
146 // We have collected less than full chunk of data, let's have just one chunk ...
147 final byte[] singleChunk;
148 if (currentOffset == 0 && prevChunks.size() == 1) {
149 // ... which we have readily available
150 return prevChunks.getFirst();
153 // ... which we need to collect
154 singleChunk = new byte[size];
156 for (byte[] chunk : prevChunks) {
157 System.arraycopy(chunk, 0, singleChunk, offset, chunk.length);
158 offset += chunk.length;
160 System.arraycopy(currentChunk, 0, singleChunk, offset, currentOffset);
164 // Determine number of chunks to aggregate and their required storage. Normally storage would be MAX_ARRAY_SIZE,
165 // but we can have faster-than-exponential startup, which ends up needing less storage -- and we do not want to
166 // end up trimming this array.
169 final Iterator<byte[]> it = prevChunks.iterator();
171 final byte[] chunk = it.next();
172 if (chunk.length == maxChunkSize) {
176 headSize += chunk.length;
178 } while (it.hasNext());
180 // Compact initial chunks into a single one
181 final byte[] head = new byte[headSize];
183 for (int i = 0; i < headCount; ++i) {
184 final byte[] chunk = prevChunks.removeFirst();
185 System.arraycopy(chunk, 0, head, offset, chunk.length);
186 offset += chunk.length;
188 verify(offset == head.length);
189 prevChunks.addFirst(head);
191 // Now append the current chunk if need be, potentially trimming it
192 if (currentOffset == 0) {
193 return ImmutableList.copyOf(prevChunks);
196 final Builder<byte[]> builder = ImmutableList.builderWithExpectedSize(prevChunks.size() + 1);
197 builder.addAll(prevChunks);
198 builder.add(trimChunk(currentChunk, currentOffset));
199 return builder.build();
202 // Ensure a single byte
203 private void ensureOneByte() {
204 if (currentChunk.length == currentOffset) {
205 nextChunk(nextChunkSize(currentChunk.length));
209 // Ensure more than one byte, returns the number of bytes available
210 private int ensureMoreBytes(final int requested) {
211 int available = currentChunk.length - currentOffset;
212 if (available == 0) {
213 nextChunk(nextChunkSize(currentChunk.length, requested));
214 available = currentChunk.length;
216 final int count = Math.min(requested, available);
221 private void nextChunk(final int chunkSize) {
222 if (prevChunks == null) {
223 prevChunks = new ArrayDeque<>();
226 prevChunks.addLast(currentChunk);
227 currentChunk = new byte[chunkSize];
231 private void checkClosed() {
232 checkState(result != null, "Stream has not been closed yet");
235 private void checkNotClosed() throws IOException {
236 if (result != null) {
237 throw new IOException("Stream is already closed");
241 private int nextChunkSize(final int currentSize, final int requested) {
242 return currentSize == maxChunkSize || requested >= maxChunkSize
243 ? maxChunkSize : Math.max(currentSize * 2, ceilingPowerOfTwo(requested));
246 private int nextChunkSize(final int currentSize) {
247 return currentSize < maxChunkSize ? currentSize * 2 : maxChunkSize;
250 private static int initialCapacity(final int requestedSize, final int maxChunkSize) {
251 if (requestedSize < MIN_ARRAY_SIZE) {
252 return MIN_ARRAY_SIZE;
254 if (requestedSize > maxChunkSize) {
257 return ceilingPowerOfTwo(requestedSize);
260 private static byte[] trimChunk(final byte[] chunk, final int length) {
261 return chunk.length == length ? chunk : Arrays.copyOf(chunk, length);