Rehost Chunked{ByteArray,InputStream,OutputStream}
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / io / ChunkedOutputStream.java
@@ -5,12 +5,16 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.cluster.datastore.persisted;
+package org.opendaylight.controller.cluster.io;
 
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Verify.verify;
 import static com.google.common.math.IntMath.ceilingPowerOfTwo;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Verify.verify;
 import static com.google.common.math.IntMath.ceilingPowerOfTwo;
+import static com.google.common.math.IntMath.isPowerOfTwo;
 
 
+import com.google.common.annotations.Beta;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableList.Builder;
 import java.io.ByteArrayOutputStream;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableList.Builder;
 import java.io.ByteArrayOutputStream;
@@ -33,7 +37,7 @@ import org.opendaylight.yangtools.concepts.Variant;
  * <ul>
  *   <li>Data acquisition, during which we start with an initial (power-of-two) size and proceed to fill it up. Once the
  *       buffer is full, we stash it, allocate a new buffer twice its size and repeat the process. Once we hit
  * <ul>
  *   <li>Data acquisition, during which we start with an initial (power-of-two) size and proceed to fill it up. Once the
  *       buffer is full, we stash it, allocate a new buffer twice its size and repeat the process. Once we hit
- *       {@link #MAX_ARRAY_SIZE}, we do not grow subsequent buffer. We also can skip some intermediate sizes if data
+ *       {@code maxChunkSize}, we do not grow subsequent buffer. We also can skip some intermediate sizes if data
  *       is introduced in large chunks via {@link #write(byte[], int, int)}.</li>
  *   <li>Buffer consolidation, which occurs when the stream is {@link #close() closed}. At this point we construct the
  *       final collection of buffers.</li>
  *       is introduced in large chunks via {@link #write(byte[], int, int)}.</li>
  *   <li>Buffer consolidation, which occurs when the stream is {@link #close() closed}. At this point we construct the
  *       final collection of buffers.</li>
@@ -42,10 +46,10 @@ import org.opendaylight.yangtools.concepts.Variant;
  * <p>
  * The data acquisition strategy results in predictably-sized buffers, which are growing exponentially in size until
  * they hit maximum size. Intrinsic property here is that the total capacity of chunks created during the ramp up is
  * <p>
  * The data acquisition strategy results in predictably-sized buffers, which are growing exponentially in size until
  * they hit maximum size. Intrinsic property here is that the total capacity of chunks created during the ramp up is
- * guaranteed to fit into {@code MAX_ARRAY_SIZE}, hence they can readily be compacted into a single buffer, which
- * replaces them. Combined with the requirement to trim the last buffer to have accurate length, this algorithm
- * guarantees total number of internal copy operations is capped at {@code 2 * MAX_ARRAY_SIZE}. The number of produced
- * chunks is also well-controlled:
+ * guaranteed to fit into {@code maxChunkSize}, hence they can readily be compacted into a single buffer, which replaces
+ * them. Combined with the requirement to trim the last buffer to have accurate length, this algorithm guarantees total
+ * number of internal copy operations is capped at {@code 2 * maxChunkSize}. The number of produced chunks is also
+ * well-controlled:
  * <ul>
  *   <li>for slowly-built data, we will maintain perfect packing</li>
  *   <li>for fast-startup data, we will be at most one one chunk away from packing perfectly</li>
  * <ul>
  *   <li>for slowly-built data, we will maintain perfect packing</li>
  *   <li>for fast-startup data, we will be at most one one chunk away from packing perfectly</li>
@@ -54,11 +58,12 @@ import org.opendaylight.yangtools.concepts.Variant;
  * @author Robert Varga
  * @author Tomas Olvecky
  */
  * @author Robert Varga
  * @author Tomas Olvecky
  */
-final class ChunkedOutputStream extends OutputStream {
-    static final int MAX_ARRAY_SIZE = ceilingPowerOfTwo(Integer.getInteger(
-        "org.opendaylight.controller.cluster.datastore.persisted.max-array-size", 256 * 1024));
+@Beta
+public final class ChunkedOutputStream extends OutputStream {
     private static final int MIN_ARRAY_SIZE = 32;
 
     private static final int MIN_ARRAY_SIZE = 32;
 
+    private final int maxChunkSize;
+
     // byte[] or a List
     private Object result;
     // Lazily-allocated to reduce pressure for single-chunk streams
     // byte[] or a List
     private Object result;
     // Lazily-allocated to reduce pressure for single-chunk streams
@@ -68,8 +73,11 @@ final class ChunkedOutputStream extends OutputStream {
     private int currentOffset;
     private int size;
 
     private int currentOffset;
     private int size;
 
-    ChunkedOutputStream(final int requestedInitialCapacity) {
-        currentChunk = new byte[initialCapacity(requestedInitialCapacity)];
+    public ChunkedOutputStream(final int requestedInitialCapacity, final int maxChunkSize) {
+        checkArgument(isPowerOfTwo(maxChunkSize), "Maximum chunk size %s is not a power of two", maxChunkSize);
+        checkArgument(maxChunkSize > 0, "Maximum chunk size %s is not positive", maxChunkSize);
+        this.maxChunkSize = maxChunkSize;
+        currentChunk = new byte[initialCapacity(requestedInitialCapacity, maxChunkSize)];
     }
 
     @Override
     }
 
     @Override
@@ -112,20 +120,21 @@ final class ChunkedOutputStream extends OutputStream {
         }
     }
 
         }
     }
 
-    int size() {
+    public int size() {
         return size;
     }
 
         return size;
     }
 
-    ChunkedByteArray toChunkedByteArray() {
+    public Variant<byte[], ChunkedByteArray> toVariant() {
         checkClosed();
         checkClosed();
-        return new ChunkedByteArray(size, result instanceof byte[] ? ImmutableList.of((byte[]) result)
-            : (ImmutableList<byte[]>) result);
+        return result instanceof byte[] ? Variant.ofFirst((byte[]) result)
+                : Variant.ofSecond(new ChunkedByteArray(size, (ImmutableList<byte[]>) result));
     }
 
     }
 
-    Variant<byte[], ChunkedByteArray> toVariant() {
+    @VisibleForTesting
+    ChunkedByteArray toChunkedByteArray() {
         checkClosed();
         checkClosed();
-        return result instanceof byte[] ? Variant.ofFirst((byte[]) result)
-                : Variant.ofSecond(new ChunkedByteArray(size, (ImmutableList<byte[]>) result));
+        return new ChunkedByteArray(size, result instanceof byte[] ? ImmutableList.of((byte[]) result)
+            : (ImmutableList<byte[]>) result);
     }
 
     private Object computeResult() {
     }
 
     private Object computeResult() {
@@ -133,7 +142,7 @@ final class ChunkedOutputStream extends OutputStream {
             // Simple case: it's only the current buffer, return that
             return trimChunk(currentChunk, currentOffset);
         }
             // Simple case: it's only the current buffer, return that
             return trimChunk(currentChunk, currentOffset);
         }
-        if (size <= MAX_ARRAY_SIZE) {
+        if (size <= maxChunkSize) {
             // We have collected less than full chunk of data, let's have just one chunk ...
             final byte[] singleChunk;
             if (currentOffset == 0 && prevChunks.size() == 1) {
             // We have collected less than full chunk of data, let's have just one chunk ...
             final byte[] singleChunk;
             if (currentOffset == 0 && prevChunks.size() == 1) {
@@ -160,7 +169,7 @@ final class ChunkedOutputStream extends OutputStream {
         final Iterator<byte[]> it = prevChunks.iterator();
         do {
             final byte[] chunk = it.next();
         final Iterator<byte[]> it = prevChunks.iterator();
         do {
             final byte[] chunk = it.next();
-            if (chunk.length == MAX_ARRAY_SIZE) {
+            if (chunk.length == maxChunkSize) {
                 break;
             }
 
                 break;
             }
 
@@ -229,21 +238,21 @@ final class ChunkedOutputStream extends OutputStream {
         }
     }
 
         }
     }
 
-    private static int nextChunkSize(final int currentSize, final int requested) {
-        return currentSize == MAX_ARRAY_SIZE || requested >= MAX_ARRAY_SIZE
-                ? MAX_ARRAY_SIZE : Math.max(currentSize * 2, ceilingPowerOfTwo(requested));
+    private int nextChunkSize(final int currentSize, final int requested) {
+        return currentSize == maxChunkSize || requested >= maxChunkSize
+                ? maxChunkSize : Math.max(currentSize * 2, ceilingPowerOfTwo(requested));
     }
 
     }
 
-    private static int nextChunkSize(final int currentSize) {
-        return currentSize < MAX_ARRAY_SIZE ? currentSize * 2 : MAX_ARRAY_SIZE;
+    private int nextChunkSize(final int currentSize) {
+        return currentSize < maxChunkSize ? currentSize * 2 : maxChunkSize;
     }
 
     }
 
-    private static int initialCapacity(final int requestedSize) {
+    private static int initialCapacity(final int requestedSize, final int maxChunkSize) {
         if (requestedSize < MIN_ARRAY_SIZE) {
             return MIN_ARRAY_SIZE;
         }
         if (requestedSize < MIN_ARRAY_SIZE) {
             return MIN_ARRAY_SIZE;
         }
-        if (requestedSize > MAX_ARRAY_SIZE) {
-            return MAX_ARRAY_SIZE;
+        if (requestedSize > maxChunkSize) {
+            return maxChunkSize;
         }
         return ceilingPowerOfTwo(requestedSize);
     }
         }
         return ceilingPowerOfTwo(requestedSize);
     }