2 * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.persisted;
10 import static com.google.common.base.Preconditions.checkState;
11 import static com.google.common.base.Verify.verify;
12 import static com.google.common.math.IntMath.ceilingPowerOfTwo;
14 import com.google.common.collect.ImmutableList;
15 import com.google.common.collect.ImmutableList.Builder;
16 import java.io.ByteArrayOutputStream;
17 import java.io.IOException;
18 import java.io.OutputStream;
19 import java.util.ArrayDeque;
20 import java.util.Arrays;
21 import java.util.Deque;
22 import java.util.Iterator;
23 import org.opendaylight.yangtools.concepts.Variant;
26 * An {@link OutputStream} implementation which collects data is a series of {@code byte[]} chunks, each of which has
27 * a fixed maximum size. This is generally preferable to {@link ByteArrayOutputStream}, as that can result in huge
28 * byte arrays -- which can create unnecessary pressure on the GC (as well as lot of copying).
31 * This class takes a different approach: it recognizes that result of buffering will be collected at some point, when
32 * the stream is already closed (and thus unmodifiable). Thus it splits the process into two steps:
34 * <li>Data acquisition, during which we start with an initial (power-of-two) size and proceed to fill it up. Once the
35 * buffer is full, we stash it, allocate a new buffer twice its size and repeat the process. Once we hit
36 * {@link #MAX_ARRAY_SIZE}, we do not grow subsequent buffer. We also can skip some intermediate sizes if data
37 * is introduced in large chunks via {@link #write(byte[], int, int)}.</li>
38 * <li>Buffer consolidation, which occurs when the stream is {@link #close() closed}. At this point we construct the
39 * final collection of buffers.</li>
43 * The data acquisition strategy results in predictably-sized buffers, which are growing exponentially in size until
44 * they hit maximum size. Intrinsic property here is that the total capacity of chunks created during the ramp up is
45 * guaranteed to fit into {@code MAX_ARRAY_SIZE}, hence they can readily be compacted into a single buffer, which
46 * replaces them. Combined with the requirement to trim the last buffer to have accurate length, this algorithm
47 * guarantees total number of internal copy operations is capped at {@code 2 * MAX_ARRAY_SIZE}. The number of produced
48 * chunks is also well-controlled:
50 * <li>for slowly-built data, we will maintain perfect packing</li>
51 * <li>for fast-startup data, we will be at most one one chunk away from packing perfectly</li>
54 * @author Robert Varga
55 * @author Tomas Olvecky
57 final class ChunkedOutputStream extends OutputStream {
58 static final int MAX_ARRAY_SIZE = ceilingPowerOfTwo(Integer.getInteger(
59 "org.opendaylight.controller.cluster.datastore.persisted.max-array-size", 256 * 1024));
60 private static final int MIN_ARRAY_SIZE = 32;
63 private Object result;
64 // Lazily-allocated to reduce pressure for single-chunk streams
65 private Deque<byte[]> prevChunks;
67 private byte[] currentChunk;
68 private int currentOffset;
71 ChunkedOutputStream(final int requestedInitialCapacity) {
72 currentChunk = new byte[initialCapacity(requestedInitialCapacity)];
76 @SuppressWarnings("checkstyle:ParameterName")
77 public void write(final int b) throws IOException {
80 currentChunk[currentOffset] = (byte) b;
86 @SuppressWarnings("checkstyle:ParameterName")
87 public void write(final byte[] b, final int off, final int len) throws IOException {
89 throw new IndexOutOfBoundsException();
97 final int count = ensureMoreBytes(toCopy);
98 System.arraycopy(b, fromOffset, currentChunk, currentOffset, count);
99 currentOffset += count;
107 public void close() {
108 if (result == null) {
109 result = computeResult();
119 ChunkedByteArray toChunkedByteArray() {
121 return new ChunkedByteArray(size, result instanceof byte[] ? ImmutableList.of((byte[]) result)
122 : (ImmutableList<byte[]>) result);
125 Variant<byte[], ChunkedByteArray> toVariant() {
127 return result instanceof byte[] ? Variant.ofFirst((byte[]) result)
128 : Variant.ofSecond(new ChunkedByteArray(size, (ImmutableList<byte[]>) result));
131 private Object computeResult() {
132 if (prevChunks == null) {
133 // Simple case: it's only the current buffer, return that
134 return trimChunk(currentChunk, currentOffset);
136 if (size <= MAX_ARRAY_SIZE) {
137 // We have collected less than full chunk of data, let's have just one chunk ...
138 final byte[] singleChunk;
139 if (currentOffset == 0 && prevChunks.size() == 1) {
140 // ... which we have readily available
141 return prevChunks.getFirst();
144 // ... which we need to collect
145 singleChunk = new byte[size];
147 for (byte[] chunk : prevChunks) {
148 System.arraycopy(chunk, 0, singleChunk, offset, chunk.length);
149 offset += chunk.length;
151 System.arraycopy(currentChunk, 0, singleChunk, offset, currentOffset);
155 // Determine number of chunks to aggregate and their required storage. Normally storage would be MAX_ARRAY_SIZE,
156 // but we can have faster-than-exponential startup, which ends up needing less storage -- and we do not want to
157 // end up trimming this array.
160 final Iterator<byte[]> it = prevChunks.iterator();
162 final byte[] chunk = it.next();
163 if (chunk.length == MAX_ARRAY_SIZE) {
167 headSize += chunk.length;
169 } while (it.hasNext());
171 // Compact initial chunks into a single one
172 final byte[] head = new byte[headSize];
174 for (int i = 0; i < headCount; ++i) {
175 final byte[] chunk = prevChunks.removeFirst();
176 System.arraycopy(chunk, 0, head, offset, chunk.length);
177 offset += chunk.length;
179 verify(offset == head.length);
180 prevChunks.addFirst(head);
182 // Now append the current chunk if need be, potentially trimming it
183 if (currentOffset == 0) {
184 return ImmutableList.copyOf(prevChunks);
187 final Builder<byte[]> builder = ImmutableList.builderWithExpectedSize(prevChunks.size() + 1);
188 builder.addAll(prevChunks);
189 builder.add(trimChunk(currentChunk, currentOffset));
190 return builder.build();
193 // Ensure a single byte
194 private void ensureOneByte() {
195 if (currentChunk.length == currentOffset) {
196 nextChunk(nextChunkSize(currentChunk.length));
200 // Ensure more than one byte, returns the number of bytes available
201 private int ensureMoreBytes(final int requested) {
202 int available = currentChunk.length - currentOffset;
203 if (available == 0) {
204 nextChunk(nextChunkSize(currentChunk.length, requested));
205 available = currentChunk.length;
207 final int count = Math.min(requested, available);
212 private void nextChunk(final int chunkSize) {
213 if (prevChunks == null) {
214 prevChunks = new ArrayDeque<>();
217 prevChunks.addLast(currentChunk);
218 currentChunk = new byte[chunkSize];
222 private void checkClosed() {
223 checkState(result != null, "Stream has not been closed yet");
226 private void checkNotClosed() throws IOException {
227 if (result != null) {
228 throw new IOException("Stream is already closed");
232 private static int nextChunkSize(final int currentSize, final int requested) {
233 return currentSize == MAX_ARRAY_SIZE || requested >= MAX_ARRAY_SIZE
234 ? MAX_ARRAY_SIZE : Math.max(currentSize * 2, ceilingPowerOfTwo(requested));
237 private static int nextChunkSize(final int currentSize) {
238 return currentSize < MAX_ARRAY_SIZE ? currentSize * 2 : MAX_ARRAY_SIZE;
241 private static int initialCapacity(final int requestedSize) {
242 if (requestedSize < MIN_ARRAY_SIZE) {
243 return MIN_ARRAY_SIZE;
245 if (requestedSize > MAX_ARRAY_SIZE) {
246 return MAX_ARRAY_SIZE;
248 return ceilingPowerOfTwo(requestedSize);
251 private static byte[] trimChunk(final byte[] chunk, final int length) {
252 return chunk.length == length ? chunk : Arrays.copyOf(chunk, length);