atomic-storage: remove type dependency at segment level I/O
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / persisted / ChunkedOutputStream.java
1 /*
2  * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.persisted;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static com.google.common.base.Verify.verify;
12 import static com.google.common.math.IntMath.ceilingPowerOfTwo;
13
14 import com.google.common.collect.ImmutableList;
15 import com.google.common.collect.ImmutableList.Builder;
16 import java.io.ByteArrayOutputStream;
17 import java.io.IOException;
18 import java.io.OutputStream;
19 import java.util.ArrayDeque;
20 import java.util.Arrays;
21 import java.util.Deque;
22 import java.util.Iterator;
23 import org.opendaylight.yangtools.concepts.Variant;
24
25 /**
26  * An {@link OutputStream} implementation which collects data is a series of {@code byte[]} chunks, each of which has
27  * a fixed maximum size. This is generally preferable to {@link ByteArrayOutputStream}, as that can result in huge
28  * byte arrays -- which can create unnecessary pressure on the GC (as well as lot of copying).
29  *
30  * <p>
31  * This class takes a different approach: it recognizes that result of buffering will be collected at some point, when
32  * the stream is already closed (and thus unmodifiable). Thus it splits the process into two steps:
33  * <ul>
34  *   <li>Data acquisition, during which we start with an initial (power-of-two) size and proceed to fill it up. Once the
35  *       buffer is full, we stash it, allocate a new buffer twice its size and repeat the process. Once we hit
36  *       {@link #MAX_ARRAY_SIZE}, we do not grow subsequent buffer. We also can skip some intermediate sizes if data
37  *       is introduced in large chunks via {@link #write(byte[], int, int)}.</li>
38  *   <li>Buffer consolidation, which occurs when the stream is {@link #close() closed}. At this point we construct the
39  *       final collection of buffers.</li>
40  * </ul>
41  *
42  * <p>
43  * The data acquisition strategy results in predictably-sized buffers, which are growing exponentially in size until
44  * they hit maximum size. Intrinsic property here is that the total capacity of chunks created during the ramp up is
45  * guaranteed to fit into {@code MAX_ARRAY_SIZE}, hence they can readily be compacted into a single buffer, which
46  * replaces them. Combined with the requirement to trim the last buffer to have accurate length, this algorithm
47  * guarantees total number of internal copy operations is capped at {@code 2 * MAX_ARRAY_SIZE}. The number of produced
48  * chunks is also well-controlled:
49  * <ul>
50  *   <li>for slowly-built data, we will maintain perfect packing</li>
51  *   <li>for fast-startup data, we will be at most one one chunk away from packing perfectly</li>
52  * </ul>
53  *
54  * @author Robert Varga
55  * @author Tomas Olvecky
56  */
57 final class ChunkedOutputStream extends OutputStream {
58     static final int MAX_ARRAY_SIZE = ceilingPowerOfTwo(Integer.getInteger(
59         "org.opendaylight.controller.cluster.datastore.persisted.max-array-size", 256 * 1024));
60     private static final int MIN_ARRAY_SIZE = 32;
61
62     // byte[] or a List
63     private Object result;
64     // Lazily-allocated to reduce pressure for single-chunk streams
65     private Deque<byte[]> prevChunks;
66
67     private byte[] currentChunk;
68     private int currentOffset;
69     private int size;
70
71     ChunkedOutputStream(final int requestedInitialCapacity) {
72         currentChunk = new byte[initialCapacity(requestedInitialCapacity)];
73     }
74
75     @Override
76     @SuppressWarnings("checkstyle:ParameterName")
77     public void write(final int b) throws IOException {
78         checkNotClosed();
79         ensureOneByte();
80         currentChunk[currentOffset] = (byte) b;
81         currentOffset++;
82         size++;
83     }
84
85     @Override
86     @SuppressWarnings("checkstyle:ParameterName")
87     public void write(final byte[] b, final int off, final int len) throws IOException {
88         if (len < 0) {
89             throw new IndexOutOfBoundsException();
90         }
91         checkNotClosed();
92
93         int fromOffset = off;
94         int toCopy = len;
95
96         while (toCopy != 0) {
97             final int count = ensureMoreBytes(toCopy);
98             System.arraycopy(b, fromOffset, currentChunk, currentOffset, count);
99             currentOffset += count;
100             size += count;
101             fromOffset += count;
102             toCopy -= count;
103         }
104     }
105
106     @Override
107     public void close() {
108         if (result == null) {
109             result = computeResult();
110             prevChunks = null;
111             currentChunk = null;
112         }
113     }
114
115     int size() {
116         return size;
117     }
118
119     ChunkedByteArray toChunkedByteArray() {
120         checkClosed();
121         return new ChunkedByteArray(size, result instanceof byte[] ? ImmutableList.of((byte[]) result)
122             : (ImmutableList<byte[]>) result);
123     }
124
125     Variant<byte[], ChunkedByteArray> toVariant() {
126         checkClosed();
127         return result instanceof byte[] ? Variant.ofFirst((byte[]) result)
128                 : Variant.ofSecond(new ChunkedByteArray(size, (ImmutableList<byte[]>) result));
129     }
130
131     private Object computeResult() {
132         if (prevChunks == null) {
133             // Simple case: it's only the current buffer, return that
134             return trimChunk(currentChunk, currentOffset);
135         }
136         if (size <= MAX_ARRAY_SIZE) {
137             // We have collected less than full chunk of data, let's have just one chunk ...
138             final byte[] singleChunk;
139             if (currentOffset == 0 && prevChunks.size() == 1) {
140                 // ... which we have readily available
141                 return prevChunks.getFirst();
142             }
143
144             // ... which we need to collect
145             singleChunk = new byte[size];
146             int offset = 0;
147             for (byte[] chunk : prevChunks) {
148                 System.arraycopy(chunk, 0, singleChunk, offset, chunk.length);
149                 offset += chunk.length;
150             }
151             System.arraycopy(currentChunk, 0, singleChunk, offset, currentOffset);
152             return singleChunk;
153         }
154
155         // Determine number of chunks to aggregate and their required storage. Normally storage would be MAX_ARRAY_SIZE,
156         // but we can have faster-than-exponential startup, which ends up needing less storage -- and we do not want to
157         // end up trimming this array.
158         int headSize = 0;
159         int headCount = 0;
160         final Iterator<byte[]> it = prevChunks.iterator();
161         do {
162             final byte[] chunk = it.next();
163             if (chunk.length == MAX_ARRAY_SIZE) {
164                 break;
165             }
166
167             headSize += chunk.length;
168             headCount++;
169         } while (it.hasNext());
170
171         // Compact initial chunks into a single one
172         final byte[] head = new byte[headSize];
173         int offset = 0;
174         for (int i = 0; i < headCount; ++i) {
175             final byte[] chunk = prevChunks.removeFirst();
176             System.arraycopy(chunk, 0, head, offset, chunk.length);
177             offset += chunk.length;
178         }
179         verify(offset == head.length);
180         prevChunks.addFirst(head);
181
182         // Now append the current chunk if need be, potentially trimming it
183         if (currentOffset == 0) {
184             return ImmutableList.copyOf(prevChunks);
185         }
186
187         final Builder<byte[]> builder = ImmutableList.builderWithExpectedSize(prevChunks.size() + 1);
188         builder.addAll(prevChunks);
189         builder.add(trimChunk(currentChunk, currentOffset));
190         return builder.build();
191     }
192
193     // Ensure a single byte
194     private void ensureOneByte() {
195         if (currentChunk.length == currentOffset) {
196             nextChunk(nextChunkSize(currentChunk.length));
197         }
198     }
199
200     // Ensure more than one byte, returns the number of bytes available
201     private int ensureMoreBytes(final int requested) {
202         int available = currentChunk.length - currentOffset;
203         if (available == 0) {
204             nextChunk(nextChunkSize(currentChunk.length, requested));
205             available = currentChunk.length;
206         }
207         final int count = Math.min(requested, available);
208         verify(count > 0);
209         return count;
210     }
211
212     private void nextChunk(final int chunkSize) {
213         if (prevChunks == null) {
214             prevChunks = new ArrayDeque<>();
215         }
216
217         prevChunks.addLast(currentChunk);
218         currentChunk = new byte[chunkSize];
219         currentOffset = 0;
220     }
221
222     private void checkClosed() {
223         checkState(result != null, "Stream has not been closed yet");
224     }
225
226     private void checkNotClosed() throws IOException {
227         if (result != null) {
228             throw new IOException("Stream is already closed");
229         }
230     }
231
232     private static int nextChunkSize(final int currentSize, final int requested) {
233         return currentSize == MAX_ARRAY_SIZE || requested >= MAX_ARRAY_SIZE
234                 ? MAX_ARRAY_SIZE : Math.max(currentSize * 2, ceilingPowerOfTwo(requested));
235     }
236
237     private static int nextChunkSize(final int currentSize) {
238         return currentSize < MAX_ARRAY_SIZE ? currentSize * 2 : MAX_ARRAY_SIZE;
239     }
240
241     private static int initialCapacity(final int requestedSize) {
242         if (requestedSize < MIN_ARRAY_SIZE) {
243             return MIN_ARRAY_SIZE;
244         }
245         if (requestedSize > MAX_ARRAY_SIZE) {
246             return MAX_ARRAY_SIZE;
247         }
248         return ceilingPowerOfTwo(requestedSize);
249     }
250
251     private static byte[] trimChunk(final byte[] chunk, final int length) {
252         return chunk.length == length ? chunk : Arrays.copyOf(chunk, length);
253     }
254 }