Rehost Chunked{ByteArray,InputStream,OutputStream} 99/93099/5
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 16 Oct 2020 15:35:39 +0000 (17:35 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Fri, 16 Oct 2020 17:15:55 +0000 (19:15 +0200)
These utility classes are immensely useful for any fragmentation
workload. Rehost them so we can reuse them. This means we will end
up with non-constant maximum chunk size, but that's fine.

JIRA: CONTROLLER-1954
Change-Id: I046ddb16d1e5c7210a781d63f302c3ee3e75742d
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/ChunkedByteArray.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedByteArray.java with 74% similarity]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/ChunkedInputStream.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedInputStream.java with 97% similarity]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/ChunkedOutputStream.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedOutputStream.java with 80% similarity]
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/io/ChunkedOutputStreamTest.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedOutputStreamTest.java with 94% similarity]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java

@@ -5,22 +5,27 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.cluster.datastore.persisted;
+package org.opendaylight.controller.cluster.io;
 
 import static java.util.Objects.requireNonNull;
 
+import com.google.common.annotations.Beta;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
+import com.google.common.io.ByteSink;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.ObjectInput;
 import java.util.ArrayList;
 import java.util.List;
 import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.opendaylight.yangtools.concepts.Immutable;
 
+@Beta
 @NonNullByDefault
-final class ChunkedByteArray implements Immutable {
+public final class ChunkedByteArray implements Immutable {
     private final ImmutableList<byte[]> chunks;
     private final int size;
 
@@ -29,7 +34,7 @@ final class ChunkedByteArray implements Immutable {
         this.chunks = requireNonNull(chunks);
     }
 
-    static ChunkedByteArray readFrom(final ObjectInput in, final int size, final int chunkSize)
+    public static ChunkedByteArray readFrom(final ObjectInput in, final int size, final int chunkSize)
             throws IOException {
         final List<byte[]> chunks = new ArrayList<>(requiredChunks(size, chunkSize));
         int remaining = size;
@@ -43,25 +48,32 @@ final class ChunkedByteArray implements Immutable {
         return new ChunkedByteArray(size, ImmutableList.copyOf(chunks));
     }
 
-    int size() {
+    public int size() {
         return size;
     }
 
-    ChunkedInputStream openStream() {
+    public InputStream openStream() {
         return new ChunkedInputStream(size, chunks.iterator());
     }
 
-    void copyTo(final DataOutput output) throws IOException {
+    public void copyTo(final DataOutput output) throws IOException {
         for (byte[] chunk : chunks) {
             output.write(chunk, 0, chunk.length);
         }
     }
 
+    public void copyTo(final ByteSink output) throws IOException {
+        for (byte[] chunk : chunks) {
+            output.write(chunk);
+        }
+    }
+
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(this).add("size", size).add("chunkCount", chunks.size()).toString();
     }
 
+    @VisibleForTesting
     ImmutableList<byte[]> getChunks() {
         return chunks;
     }
@@ -5,7 +5,7 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.cluster.datastore.persisted;
+package org.opendaylight.controller.cluster.io;
 
 import static java.util.Objects.requireNonNull;
 
@@ -5,12 +5,16 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.cluster.datastore.persisted;
+package org.opendaylight.controller.cluster.io;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Verify.verify;
 import static com.google.common.math.IntMath.ceilingPowerOfTwo;
+import static com.google.common.math.IntMath.isPowerOfTwo;
 
+import com.google.common.annotations.Beta;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableList.Builder;
 import java.io.ByteArrayOutputStream;
@@ -33,7 +37,7 @@ import org.opendaylight.yangtools.concepts.Variant;
  * <ul>
  *   <li>Data acquisition, during which we start with an initial (power-of-two) size and proceed to fill it up. Once the
  *       buffer is full, we stash it, allocate a new buffer twice its size and repeat the process. Once we hit
- *       {@link #MAX_ARRAY_SIZE}, we do not grow subsequent buffer. We also can skip some intermediate sizes if data
+ *       {@code maxChunkSize}, we do not grow subsequent buffer. We also can skip some intermediate sizes if data
  *       is introduced in large chunks via {@link #write(byte[], int, int)}.</li>
  *   <li>Buffer consolidation, which occurs when the stream is {@link #close() closed}. At this point we construct the
  *       final collection of buffers.</li>
@@ -42,10 +46,10 @@ import org.opendaylight.yangtools.concepts.Variant;
  * <p>
  * The data acquisition strategy results in predictably-sized buffers, which are growing exponentially in size until
  * they hit maximum size. Intrinsic property here is that the total capacity of chunks created during the ramp up is
- * guaranteed to fit into {@code MAX_ARRAY_SIZE}, hence they can readily be compacted into a single buffer, which
- * replaces them. Combined with the requirement to trim the last buffer to have accurate length, this algorithm
- * guarantees total number of internal copy operations is capped at {@code 2 * MAX_ARRAY_SIZE}. The number of produced
- * chunks is also well-controlled:
+ * guaranteed to fit into {@code maxChunkSize}, hence they can readily be compacted into a single buffer, which replaces
+ * them. Combined with the requirement to trim the last buffer to have accurate length, this algorithm guarantees total
+ * number of internal copy operations is capped at {@code 2 * maxChunkSize}. The number of produced chunks is also
+ * well-controlled:
  * <ul>
  *   <li>for slowly-built data, we will maintain perfect packing</li>
  *   <li>for fast-startup data, we will be at most one one chunk away from packing perfectly</li>
@@ -54,11 +58,12 @@ import org.opendaylight.yangtools.concepts.Variant;
  * @author Robert Varga
  * @author Tomas Olvecky
  */
-final class ChunkedOutputStream extends OutputStream {
-    static final int MAX_ARRAY_SIZE = ceilingPowerOfTwo(Integer.getInteger(
-        "org.opendaylight.controller.cluster.datastore.persisted.max-array-size", 256 * 1024));
+@Beta
+public final class ChunkedOutputStream extends OutputStream {
     private static final int MIN_ARRAY_SIZE = 32;
 
+    private final int maxChunkSize;
+
     // byte[] or a List
     private Object result;
     // Lazily-allocated to reduce pressure for single-chunk streams
@@ -68,8 +73,11 @@ final class ChunkedOutputStream extends OutputStream {
     private int currentOffset;
     private int size;
 
-    ChunkedOutputStream(final int requestedInitialCapacity) {
-        currentChunk = new byte[initialCapacity(requestedInitialCapacity)];
+    public ChunkedOutputStream(final int requestedInitialCapacity, final int maxChunkSize) {
+        checkArgument(isPowerOfTwo(maxChunkSize), "Maximum chunk size %s is not a power of two", maxChunkSize);
+        checkArgument(maxChunkSize > 0, "Maximum chunk size %s is not positive", maxChunkSize);
+        this.maxChunkSize = maxChunkSize;
+        currentChunk = new byte[initialCapacity(requestedInitialCapacity, maxChunkSize)];
     }
 
     @Override
@@ -112,20 +120,21 @@ final class ChunkedOutputStream extends OutputStream {
         }
     }
 
-    int size() {
+    public int size() {
         return size;
     }
 
-    ChunkedByteArray toChunkedByteArray() {
+    public Variant<byte[], ChunkedByteArray> toVariant() {
         checkClosed();
-        return new ChunkedByteArray(size, result instanceof byte[] ? ImmutableList.of((byte[]) result)
-            : (ImmutableList<byte[]>) result);
+        return result instanceof byte[] ? Variant.ofFirst((byte[]) result)
+                : Variant.ofSecond(new ChunkedByteArray(size, (ImmutableList<byte[]>) result));
     }
 
-    Variant<byte[], ChunkedByteArray> toVariant() {
+    @VisibleForTesting
+    ChunkedByteArray toChunkedByteArray() {
         checkClosed();
-        return result instanceof byte[] ? Variant.ofFirst((byte[]) result)
-                : Variant.ofSecond(new ChunkedByteArray(size, (ImmutableList<byte[]>) result));
+        return new ChunkedByteArray(size, result instanceof byte[] ? ImmutableList.of((byte[]) result)
+            : (ImmutableList<byte[]>) result);
     }
 
     private Object computeResult() {
@@ -133,7 +142,7 @@ final class ChunkedOutputStream extends OutputStream {
             // Simple case: it's only the current buffer, return that
             return trimChunk(currentChunk, currentOffset);
         }
-        if (size <= MAX_ARRAY_SIZE) {
+        if (size <= maxChunkSize) {
             // We have collected less than full chunk of data, let's have just one chunk ...
             final byte[] singleChunk;
             if (currentOffset == 0 && prevChunks.size() == 1) {
@@ -160,7 +169,7 @@ final class ChunkedOutputStream extends OutputStream {
         final Iterator<byte[]> it = prevChunks.iterator();
         do {
             final byte[] chunk = it.next();
-            if (chunk.length == MAX_ARRAY_SIZE) {
+            if (chunk.length == maxChunkSize) {
                 break;
             }
 
@@ -229,21 +238,21 @@ final class ChunkedOutputStream extends OutputStream {
         }
     }
 
-    private static int nextChunkSize(final int currentSize, final int requested) {
-        return currentSize == MAX_ARRAY_SIZE || requested >= MAX_ARRAY_SIZE
-                ? MAX_ARRAY_SIZE : Math.max(currentSize * 2, ceilingPowerOfTwo(requested));
+    private int nextChunkSize(final int currentSize, final int requested) {
+        return currentSize == maxChunkSize || requested >= maxChunkSize
+                ? maxChunkSize : Math.max(currentSize * 2, ceilingPowerOfTwo(requested));
     }
 
-    private static int nextChunkSize(final int currentSize) {
-        return currentSize < MAX_ARRAY_SIZE ? currentSize * 2 : MAX_ARRAY_SIZE;
+    private int nextChunkSize(final int currentSize) {
+        return currentSize < maxChunkSize ? currentSize * 2 : maxChunkSize;
     }
 
-    private static int initialCapacity(final int requestedSize) {
+    private static int initialCapacity(final int requestedSize, final int maxChunkSize) {
         if (requestedSize < MIN_ARRAY_SIZE) {
             return MIN_ARRAY_SIZE;
         }
-        if (requestedSize > MAX_ARRAY_SIZE) {
-            return MAX_ARRAY_SIZE;
+        if (requestedSize > maxChunkSize) {
+            return maxChunkSize;
         }
         return ceilingPowerOfTwo(requestedSize);
     }
@@ -5,7 +5,7 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.cluster.datastore.persisted;
+package org.opendaylight.controller.cluster.io;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -16,8 +16,9 @@ import org.junit.Test;
 
 public class ChunkedOutputStreamTest {
     private static final int INITIAL_SIZE = 256;
+    private static final int MAX_ARRAY_SIZE = 256 * 1024;
 
-    private final ChunkedOutputStream stream = new ChunkedOutputStream(INITIAL_SIZE);
+    private final ChunkedOutputStream stream = new ChunkedOutputStream(INITIAL_SIZE, MAX_ARRAY_SIZE);
 
     @Test
     public void testBasicWrite() throws IOException {
@@ -63,7 +64,7 @@ public class ChunkedOutputStreamTest {
 
     @Test
     public void testTwoChunksWrite() throws IOException {
-        int size = ChunkedOutputStream.MAX_ARRAY_SIZE + 1;
+        int size = MAX_ARRAY_SIZE + 1;
         for (int i = 0; i < size; ++i) {
             stream.write(i);
         }
index 65a65ea620120bb0e37f0b29c1969b5af2b400de..29a328730aab031d8355dcee142746808f9411d1 100644 (file)
@@ -8,8 +8,8 @@
 package org.opendaylight.controller.cluster.datastore.persisted;
 
 import static com.google.common.base.Verify.verifyNotNull;
+import static com.google.common.math.IntMath.ceilingPowerOfTwo;
 import static java.util.Objects.requireNonNull;
-import static org.opendaylight.controller.cluster.datastore.persisted.ChunkedOutputStream.MAX_ARRAY_SIZE;
 
 import com.google.common.annotations.Beta;
 import com.google.common.annotations.VisibleForTesting;
@@ -29,6 +29,8 @@ import java.util.Map.Entry;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion;
+import org.opendaylight.controller.cluster.io.ChunkedByteArray;
+import org.opendaylight.controller.cluster.io.ChunkedOutputStream;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.IdentifiablePayload;
 import org.opendaylight.yangtools.concepts.Variant;
 import org.opendaylight.yangtools.yang.data.api.schema.stream.ReusableStreamReceiver;
@@ -49,6 +51,9 @@ public abstract class CommitTransactionPayload extends IdentifiablePayload<Trans
     private static final Logger LOG = LoggerFactory.getLogger(CommitTransactionPayload.class);
     private static final long serialVersionUID = 1L;
 
+    private static final int MAX_ARRAY_SIZE = ceilingPowerOfTwo(Integer.getInteger(
+        "org.opendaylight.controller.cluster.datastore.persisted.max-array-size", 256 * 1024));
+
     private volatile Entry<TransactionIdentifier, DataTreeCandidateWithVersion> candidate = null;
 
     CommitTransactionPayload() {
@@ -58,7 +63,7 @@ public abstract class CommitTransactionPayload extends IdentifiablePayload<Trans
     public static @NonNull CommitTransactionPayload create(final TransactionIdentifier transactionId,
             final DataTreeCandidate candidate, final PayloadVersion version, final int initialSerializedBufferCapacity)
                     throws IOException {
-        final ChunkedOutputStream cos = new ChunkedOutputStream(initialSerializedBufferCapacity);
+        final ChunkedOutputStream cos = new ChunkedOutputStream(initialSerializedBufferCapacity, MAX_ARRAY_SIZE);
         try (DataOutputStream dos = new DataOutputStream(cos)) {
             transactionId.writeTo(dos);
             DataTreeCandidateInputOutput.writeDataTreeCandidate(dos, version, candidate);