8215e23ff371d290846a3e2d75528847219c2e0d
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / io / ChunkedOutputStream.java
1 /*
2  * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.io;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12 import static com.google.common.base.Verify.verify;
13 import static com.google.common.math.IntMath.ceilingPowerOfTwo;
14 import static com.google.common.math.IntMath.isPowerOfTwo;
15
16 import com.google.common.annotations.Beta;
17 import com.google.common.annotations.VisibleForTesting;
18 import com.google.common.collect.ImmutableList;
19 import com.google.common.collect.ImmutableList.Builder;
20 import java.io.ByteArrayOutputStream;
21 import java.io.IOException;
22 import java.io.OutputStream;
23 import java.util.ArrayDeque;
24 import java.util.Arrays;
25 import java.util.Deque;
26 import java.util.Iterator;
27 import org.opendaylight.yangtools.concepts.Variant;
28
29 /**
30  * An {@link OutputStream} implementation which collects data is a series of {@code byte[]} chunks, each of which has
31  * a fixed maximum size. This is generally preferable to {@link ByteArrayOutputStream}, as that can result in huge
32  * byte arrays -- which can create unnecessary pressure on the GC (as well as lot of copying).
33  *
34  * <p>
35  * This class takes a different approach: it recognizes that result of buffering will be collected at some point, when
36  * the stream is already closed (and thus unmodifiable). Thus it splits the process into two steps:
37  * <ul>
38  *   <li>Data acquisition, during which we start with an initial (power-of-two) size and proceed to fill it up. Once the
39  *       buffer is full, we stash it, allocate a new buffer twice its size and repeat the process. Once we hit
40  *       {@code maxChunkSize}, we do not grow subsequent buffer. We also can skip some intermediate sizes if data
41  *       is introduced in large chunks via {@link #write(byte[], int, int)}.</li>
42  *   <li>Buffer consolidation, which occurs when the stream is {@link #close() closed}. At this point we construct the
43  *       final collection of buffers.</li>
44  * </ul>
45  *
46  * <p>
47  * The data acquisition strategy results in predictably-sized buffers, which are growing exponentially in size until
48  * they hit maximum size. Intrinsic property here is that the total capacity of chunks created during the ramp up is
49  * guaranteed to fit into {@code maxChunkSize}, hence they can readily be compacted into a single buffer, which replaces
50  * them. Combined with the requirement to trim the last buffer to have accurate length, this algorithm guarantees total
51  * number of internal copy operations is capped at {@code 2 * maxChunkSize}. The number of produced chunks is also
52  * well-controlled:
53  * <ul>
54  *   <li>for slowly-built data, we will maintain perfect packing</li>
55  *   <li>for fast-startup data, we will be at most one one chunk away from packing perfectly</li>
56  * </ul>
57  *
58  * @author Robert Varga
59  * @author Tomas Olvecky
60  */
61 @Beta
62 public final class ChunkedOutputStream extends OutputStream {
63     private static final int MIN_ARRAY_SIZE = 32;
64
65     private final int maxChunkSize;
66
67     // byte[] or a List
68     private Object result;
69     // Lazily-allocated to reduce pressure for single-chunk streams
70     private Deque<byte[]> prevChunks;
71
72     private byte[] currentChunk;
73     private int currentOffset;
74     private int size;
75
76     public ChunkedOutputStream(final int requestedInitialCapacity, final int maxChunkSize) {
77         checkArgument(isPowerOfTwo(maxChunkSize), "Maximum chunk size %s is not a power of two", maxChunkSize);
78         checkArgument(maxChunkSize > 0, "Maximum chunk size %s is not positive", maxChunkSize);
79         this.maxChunkSize = maxChunkSize;
80         currentChunk = new byte[initialCapacity(requestedInitialCapacity, maxChunkSize)];
81     }
82
83     @Override
84     @SuppressWarnings("checkstyle:ParameterName")
85     public void write(final int b) throws IOException {
86         checkNotClosed();
87         ensureOneByte();
88         currentChunk[currentOffset] = (byte) b;
89         currentOffset++;
90         size++;
91     }
92
93     @Override
94     @SuppressWarnings("checkstyle:ParameterName")
95     public void write(final byte[] b, final int off, final int len) throws IOException {
96         if (len < 0) {
97             throw new IndexOutOfBoundsException();
98         }
99         checkNotClosed();
100
101         int fromOffset = off;
102         int toCopy = len;
103
104         while (toCopy != 0) {
105             final int count = ensureMoreBytes(toCopy);
106             System.arraycopy(b, fromOffset, currentChunk, currentOffset, count);
107             currentOffset += count;
108             size += count;
109             fromOffset += count;
110             toCopy -= count;
111         }
112     }
113
114     @Override
115     public void close() {
116         if (result == null) {
117             result = computeResult();
118             prevChunks = null;
119             currentChunk = null;
120         }
121     }
122
123     public int size() {
124         return size;
125     }
126
127     public Variant<byte[], ChunkedByteArray> toVariant() {
128         checkClosed();
129         return result instanceof byte[] ? Variant.ofFirst((byte[]) result)
130                 : Variant.ofSecond(new ChunkedByteArray(size, (ImmutableList<byte[]>) result));
131     }
132
133     @VisibleForTesting
134     ChunkedByteArray toChunkedByteArray() {
135         checkClosed();
136         return new ChunkedByteArray(size, result instanceof byte[] ? ImmutableList.of((byte[]) result)
137             : (ImmutableList<byte[]>) result);
138     }
139
140     private Object computeResult() {
141         if (prevChunks == null) {
142             // Simple case: it's only the current buffer, return that
143             return trimChunk(currentChunk, currentOffset);
144         }
145         if (size <= maxChunkSize) {
146             // We have collected less than full chunk of data, let's have just one chunk ...
147             final byte[] singleChunk;
148             if (currentOffset == 0 && prevChunks.size() == 1) {
149                 // ... which we have readily available
150                 return prevChunks.getFirst();
151             }
152
153             // ... which we need to collect
154             singleChunk = new byte[size];
155             int offset = 0;
156             for (byte[] chunk : prevChunks) {
157                 System.arraycopy(chunk, 0, singleChunk, offset, chunk.length);
158                 offset += chunk.length;
159             }
160             System.arraycopy(currentChunk, 0, singleChunk, offset, currentOffset);
161             return singleChunk;
162         }
163
164         // Determine number of chunks to aggregate and their required storage. Normally storage would be MAX_ARRAY_SIZE,
165         // but we can have faster-than-exponential startup, which ends up needing less storage -- and we do not want to
166         // end up trimming this array.
167         int headSize = 0;
168         int headCount = 0;
169         final Iterator<byte[]> it = prevChunks.iterator();
170         do {
171             final byte[] chunk = it.next();
172             if (chunk.length == maxChunkSize) {
173                 break;
174             }
175
176             headSize += chunk.length;
177             headCount++;
178         } while (it.hasNext());
179
180         // Compact initial chunks into a single one
181         final byte[] head = new byte[headSize];
182         int offset = 0;
183         for (int i = 0; i < headCount; ++i) {
184             final byte[] chunk = prevChunks.removeFirst();
185             System.arraycopy(chunk, 0, head, offset, chunk.length);
186             offset += chunk.length;
187         }
188         verify(offset == head.length);
189         prevChunks.addFirst(head);
190
191         // Now append the current chunk if need be, potentially trimming it
192         if (currentOffset == 0) {
193             return ImmutableList.copyOf(prevChunks);
194         }
195
196         final Builder<byte[]> builder = ImmutableList.builderWithExpectedSize(prevChunks.size() + 1);
197         builder.addAll(prevChunks);
198         builder.add(trimChunk(currentChunk, currentOffset));
199         return builder.build();
200     }
201
202     // Ensure a single byte
203     private void ensureOneByte() {
204         if (currentChunk.length == currentOffset) {
205             nextChunk(nextChunkSize(currentChunk.length));
206         }
207     }
208
209     // Ensure more than one byte, returns the number of bytes available
210     private int ensureMoreBytes(final int requested) {
211         int available = currentChunk.length - currentOffset;
212         if (available == 0) {
213             nextChunk(nextChunkSize(currentChunk.length, requested));
214             available = currentChunk.length;
215         }
216         final int count = Math.min(requested, available);
217         verify(count > 0);
218         return count;
219     }
220
221     private void nextChunk(final int chunkSize) {
222         if (prevChunks == null) {
223             prevChunks = new ArrayDeque<>();
224         }
225
226         prevChunks.addLast(currentChunk);
227         currentChunk = new byte[chunkSize];
228         currentOffset = 0;
229     }
230
231     private void checkClosed() {
232         checkState(result != null, "Stream has not been closed yet");
233     }
234
235     private void checkNotClosed() throws IOException {
236         if (result != null) {
237             throw new IOException("Stream is already closed");
238         }
239     }
240
241     private int nextChunkSize(final int currentSize, final int requested) {
242         return currentSize == maxChunkSize || requested >= maxChunkSize
243                 ? maxChunkSize : Math.max(currentSize * 2, ceilingPowerOfTwo(requested));
244     }
245
246     private int nextChunkSize(final int currentSize) {
247         return currentSize < maxChunkSize ? currentSize * 2 : maxChunkSize;
248     }
249
250     private static int initialCapacity(final int requestedSize, final int maxChunkSize) {
251         if (requestedSize < MIN_ARRAY_SIZE) {
252             return MIN_ARRAY_SIZE;
253         }
254         if (requestedSize > maxChunkSize) {
255             return maxChunkSize;
256         }
257         return ceilingPowerOfTwo(requestedSize);
258     }
259
260     private static byte[] trimChunk(final byte[] chunk, final int length) {
261         return chunk.length == length ? chunk : Arrays.copyOf(chunk, length);
262     }
263 }