* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.cluster.datastore.persisted;
+package org.opendaylight.controller.cluster.io;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.math.IntMath.ceilingPowerOfTwo;
+import static com.google.common.math.IntMath.isPowerOfTwo;
+import com.google.common.annotations.Beta;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableList.Builder;
import java.io.ByteArrayOutputStream;
* <ul>
* <li>Data acquisition, during which we start with an initial (power-of-two) size and proceed to fill it up. Once the
* buffer is full, we stash it, allocate a new buffer twice its size and repeat the process. Once we hit
- * {@link #MAX_ARRAY_SIZE}, we do not grow subsequent buffer. We also can skip some intermediate sizes if data
+ * {@code maxChunkSize}, we do not grow subsequent buffer. We also can skip some intermediate sizes if data
* is introduced in large chunks via {@link #write(byte[], int, int)}.</li>
* <li>Buffer consolidation, which occurs when the stream is {@link #close() closed}. At this point we construct the
* final collection of buffers.</li>
* <p>
* The data acquisition strategy results in predictably-sized buffers, which are growing exponentially in size until
* they hit maximum size. Intrinsic property here is that the total capacity of chunks created during the ramp up is
- * guaranteed to fit into {@code MAX_ARRAY_SIZE}, hence they can readily be compacted into a single buffer, which
- * replaces them. Combined with the requirement to trim the last buffer to have accurate length, this algorithm
- * guarantees total number of internal copy operations is capped at {@code 2 * MAX_ARRAY_SIZE}. The number of produced
- * chunks is also well-controlled:
+ * guaranteed to fit into {@code maxChunkSize}, hence they can readily be compacted into a single buffer, which replaces
+ * them. Combined with the requirement to trim the last buffer to have accurate length, this algorithm guarantees total
+ * number of internal copy operations is capped at {@code 2 * maxChunkSize}. The number of produced chunks is also
+ * well-controlled:
* <ul>
* <li>for slowly-built data, we will maintain perfect packing</li>
* <li>for fast-startup data, we will be at most one one chunk away from packing perfectly</li>
* @author Robert Varga
* @author Tomas Olvecky
*/
-final class ChunkedOutputStream extends OutputStream {
- static final int MAX_ARRAY_SIZE = ceilingPowerOfTwo(Integer.getInteger(
- "org.opendaylight.controller.cluster.datastore.persisted.max-array-size", 256 * 1024));
+@Beta
+public final class ChunkedOutputStream extends OutputStream {
private static final int MIN_ARRAY_SIZE = 32;
+ private final int maxChunkSize;
+
// byte[] or a List
private Object result;
// Lazily-allocated to reduce pressure for single-chunk streams
private int currentOffset;
private int size;
- ChunkedOutputStream(final int requestedInitialCapacity) {
- currentChunk = new byte[initialCapacity(requestedInitialCapacity)];
+ public ChunkedOutputStream(final int requestedInitialCapacity, final int maxChunkSize) {
+ checkArgument(isPowerOfTwo(maxChunkSize), "Maximum chunk size %s is not a power of two", maxChunkSize);
+ checkArgument(maxChunkSize > 0, "Maximum chunk size %s is not positive", maxChunkSize);
+ this.maxChunkSize = maxChunkSize;
+ currentChunk = new byte[initialCapacity(requestedInitialCapacity, maxChunkSize)];
}
@Override
}
}
- int size() {
+ public int size() {
return size;
}
- ChunkedByteArray toChunkedByteArray() {
+ public Variant<byte[], ChunkedByteArray> toVariant() {
checkClosed();
- return new ChunkedByteArray(size, result instanceof byte[] ? ImmutableList.of((byte[]) result)
- : (ImmutableList<byte[]>) result);
+ return result instanceof byte[] ? Variant.ofFirst((byte[]) result)
+ : Variant.ofSecond(new ChunkedByteArray(size, (ImmutableList<byte[]>) result));
}
- Variant<byte[], ChunkedByteArray> toVariant() {
+ @VisibleForTesting
+ ChunkedByteArray toChunkedByteArray() {
checkClosed();
- return result instanceof byte[] ? Variant.ofFirst((byte[]) result)
- : Variant.ofSecond(new ChunkedByteArray(size, (ImmutableList<byte[]>) result));
+ return new ChunkedByteArray(size, result instanceof byte[] ? ImmutableList.of((byte[]) result)
+ : (ImmutableList<byte[]>) result);
}
private Object computeResult() {
// Simple case: it's only the current buffer, return that
return trimChunk(currentChunk, currentOffset);
}
- if (size <= MAX_ARRAY_SIZE) {
+ if (size <= maxChunkSize) {
// We have collected less than full chunk of data, let's have just one chunk ...
final byte[] singleChunk;
if (currentOffset == 0 && prevChunks.size() == 1) {
final Iterator<byte[]> it = prevChunks.iterator();
do {
final byte[] chunk = it.next();
- if (chunk.length == MAX_ARRAY_SIZE) {
+ if (chunk.length == maxChunkSize) {
break;
}
}
}
- private static int nextChunkSize(final int currentSize, final int requested) {
- return currentSize == MAX_ARRAY_SIZE || requested >= MAX_ARRAY_SIZE
- ? MAX_ARRAY_SIZE : Math.max(currentSize * 2, ceilingPowerOfTwo(requested));
+ private int nextChunkSize(final int currentSize, final int requested) {
+ return currentSize == maxChunkSize || requested >= maxChunkSize
+ ? maxChunkSize : Math.max(currentSize * 2, ceilingPowerOfTwo(requested));
}
- private static int nextChunkSize(final int currentSize) {
- return currentSize < MAX_ARRAY_SIZE ? currentSize * 2 : MAX_ARRAY_SIZE;
+ private int nextChunkSize(final int currentSize) {
+ return currentSize < maxChunkSize ? currentSize * 2 : maxChunkSize;
}
- private static int initialCapacity(final int requestedSize) {
+ private static int initialCapacity(final int requestedSize, final int maxChunkSize) {
if (requestedSize < MIN_ARRAY_SIZE) {
return MIN_ARRAY_SIZE;
}
- if (requestedSize > MAX_ARRAY_SIZE) {
- return MAX_ARRAY_SIZE;
+ if (requestedSize > maxChunkSize) {
+ return maxChunkSize;
}
return ceilingPowerOfTwo(requestedSize);
}