From 88d921169a0ccd41339e5409bbe8e7db18597609 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Fri, 16 Oct 2020 17:35:39 +0200 Subject: [PATCH] Rehost Chunked{ByteArray,InputStream,OutputStream} These utility classes are immensely useful for any fragmentation workload. Rehost them so we can reuse them. This means we will end up with non-constant maximum chunk size, but that's fine. JIRA: CONTROLLER-1954 Change-Id: I046ddb16d1e5c7210a781d63f302c3ee3e75742d Signed-off-by: Robert Varga --- .../cluster/io}/ChunkedByteArray.java | 24 +++++-- .../cluster/io}/ChunkedInputStream.java | 2 +- .../cluster/io}/ChunkedOutputStream.java | 65 +++++++++++-------- .../cluster/io}/ChunkedOutputStreamTest.java | 7 +- .../persisted/CommitTransactionPayload.java | 9 ++- 5 files changed, 67 insertions(+), 40 deletions(-) rename opendaylight/md-sal/{sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted => sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io}/ChunkedByteArray.java (74%) rename opendaylight/md-sal/{sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted => sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io}/ChunkedInputStream.java (97%) rename opendaylight/md-sal/{sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted => sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io}/ChunkedOutputStream.java (80%) rename opendaylight/md-sal/{sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted => sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/io}/ChunkedOutputStreamTest.java (94%) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedByteArray.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/ChunkedByteArray.java similarity index 74% rename from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedByteArray.java rename to opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/ChunkedByteArray.java index 73a7df9542..87f8a18aea 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedByteArray.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/ChunkedByteArray.java @@ -5,22 +5,27 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.cluster.datastore.persisted; +package org.opendaylight.controller.cluster.io; import static java.util.Objects.requireNonNull; +import com.google.common.annotations.Beta; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; +import com.google.common.io.ByteSink; import java.io.DataOutput; import java.io.IOException; +import java.io.InputStream; import java.io.ObjectInput; import java.util.ArrayList; import java.util.List; import org.eclipse.jdt.annotation.NonNullByDefault; import org.opendaylight.yangtools.concepts.Immutable; +@Beta @NonNullByDefault -final class ChunkedByteArray implements Immutable { +public final class ChunkedByteArray implements Immutable { private final ImmutableList chunks; private final int size; @@ -29,7 +34,7 @@ final class ChunkedByteArray implements Immutable { this.chunks = requireNonNull(chunks); } - static ChunkedByteArray readFrom(final ObjectInput in, final int size, final int chunkSize) + public static ChunkedByteArray readFrom(final ObjectInput in, final int size, final int chunkSize) throws IOException { final List chunks = new ArrayList<>(requiredChunks(size, chunkSize)); int remaining = size; @@ -43,25 +48,32 @@ final class ChunkedByteArray implements Immutable { return new ChunkedByteArray(size, ImmutableList.copyOf(chunks)); } - int size() { + public int size() { return size; } - ChunkedInputStream openStream() { + public InputStream openStream() { return new ChunkedInputStream(size, chunks.iterator()); } - void copyTo(final DataOutput output) throws IOException { + public void copyTo(final DataOutput output) throws IOException { for (byte[] chunk : chunks) { output.write(chunk, 0, chunk.length); } } + public void copyTo(final ByteSink output) throws IOException { + for (byte[] chunk : chunks) { + output.write(chunk); + } + } + @Override public String toString() { return MoreObjects.toStringHelper(this).add("size", size).add("chunkCount", chunks.size()).toString(); } + @VisibleForTesting ImmutableList getChunks() { return chunks; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedInputStream.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/ChunkedInputStream.java similarity index 97% rename from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedInputStream.java rename to opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/ChunkedInputStream.java index 68a906ed54..b26126b744 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedInputStream.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/ChunkedInputStream.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.cluster.datastore.persisted; +package org.opendaylight.controller.cluster.io; import static java.util.Objects.requireNonNull; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedOutputStream.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/ChunkedOutputStream.java similarity index 80% rename from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedOutputStream.java rename to opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/ChunkedOutputStream.java index 717c3d1bd9..8215e23ff3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedOutputStream.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/ChunkedOutputStream.java @@ -5,12 +5,16 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.cluster.datastore.persisted; +package org.opendaylight.controller.cluster.io; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import static com.google.common.math.IntMath.ceilingPowerOfTwo; +import static com.google.common.math.IntMath.isPowerOfTwo; +import com.google.common.annotations.Beta; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList.Builder; import java.io.ByteArrayOutputStream; @@ -33,7 +37,7 @@ import org.opendaylight.yangtools.concepts.Variant; *
    *
  • Data acquisition, during which we start with an initial (power-of-two) size and proceed to fill it up. Once the * buffer is full, we stash it, allocate a new buffer twice its size and repeat the process. Once we hit - * {@link #MAX_ARRAY_SIZE}, we do not grow subsequent buffer. We also can skip some intermediate sizes if data + * {@code maxChunkSize}, we do not grow subsequent buffer. We also can skip some intermediate sizes if data * is introduced in large chunks via {@link #write(byte[], int, int)}.
  • *
  • Buffer consolidation, which occurs when the stream is {@link #close() closed}. At this point we construct the * final collection of buffers.
  • @@ -42,10 +46,10 @@ import org.opendaylight.yangtools.concepts.Variant; *

    * The data acquisition strategy results in predictably-sized buffers, which are growing exponentially in size until * they hit maximum size. Intrinsic property here is that the total capacity of chunks created during the ramp up is - * guaranteed to fit into {@code MAX_ARRAY_SIZE}, hence they can readily be compacted into a single buffer, which - * replaces them. Combined with the requirement to trim the last buffer to have accurate length, this algorithm - * guarantees total number of internal copy operations is capped at {@code 2 * MAX_ARRAY_SIZE}. The number of produced - * chunks is also well-controlled: + * guaranteed to fit into {@code maxChunkSize}, hence they can readily be compacted into a single buffer, which replaces + * them. Combined with the requirement to trim the last buffer to have accurate length, this algorithm guarantees total + * number of internal copy operations is capped at {@code 2 * maxChunkSize}. The number of produced chunks is also + * well-controlled: *

      *
    • for slowly-built data, we will maintain perfect packing
    • *
    • for fast-startup data, we will be at most one one chunk away from packing perfectly
    • @@ -54,11 +58,12 @@ import org.opendaylight.yangtools.concepts.Variant; * @author Robert Varga * @author Tomas Olvecky */ -final class ChunkedOutputStream extends OutputStream { - static final int MAX_ARRAY_SIZE = ceilingPowerOfTwo(Integer.getInteger( - "org.opendaylight.controller.cluster.datastore.persisted.max-array-size", 256 * 1024)); +@Beta +public final class ChunkedOutputStream extends OutputStream { private static final int MIN_ARRAY_SIZE = 32; + private final int maxChunkSize; + // byte[] or a List private Object result; // Lazily-allocated to reduce pressure for single-chunk streams @@ -68,8 +73,11 @@ final class ChunkedOutputStream extends OutputStream { private int currentOffset; private int size; - ChunkedOutputStream(final int requestedInitialCapacity) { - currentChunk = new byte[initialCapacity(requestedInitialCapacity)]; + public ChunkedOutputStream(final int requestedInitialCapacity, final int maxChunkSize) { + checkArgument(isPowerOfTwo(maxChunkSize), "Maximum chunk size %s is not a power of two", maxChunkSize); + checkArgument(maxChunkSize > 0, "Maximum chunk size %s is not positive", maxChunkSize); + this.maxChunkSize = maxChunkSize; + currentChunk = new byte[initialCapacity(requestedInitialCapacity, maxChunkSize)]; } @Override @@ -112,20 +120,21 @@ final class ChunkedOutputStream extends OutputStream { } } - int size() { + public int size() { return size; } - ChunkedByteArray toChunkedByteArray() { + public Variant toVariant() { checkClosed(); - return new ChunkedByteArray(size, result instanceof byte[] ? ImmutableList.of((byte[]) result) - : (ImmutableList) result); + return result instanceof byte[] ? Variant.ofFirst((byte[]) result) + : Variant.ofSecond(new ChunkedByteArray(size, (ImmutableList) result)); } - Variant toVariant() { + @VisibleForTesting + ChunkedByteArray toChunkedByteArray() { checkClosed(); - return result instanceof byte[] ? Variant.ofFirst((byte[]) result) - : Variant.ofSecond(new ChunkedByteArray(size, (ImmutableList) result)); + return new ChunkedByteArray(size, result instanceof byte[] ? ImmutableList.of((byte[]) result) + : (ImmutableList) result); } private Object computeResult() { @@ -133,7 +142,7 @@ final class ChunkedOutputStream extends OutputStream { // Simple case: it's only the current buffer, return that return trimChunk(currentChunk, currentOffset); } - if (size <= MAX_ARRAY_SIZE) { + if (size <= maxChunkSize) { // We have collected less than full chunk of data, let's have just one chunk ... final byte[] singleChunk; if (currentOffset == 0 && prevChunks.size() == 1) { @@ -160,7 +169,7 @@ final class ChunkedOutputStream extends OutputStream { final Iterator it = prevChunks.iterator(); do { final byte[] chunk = it.next(); - if (chunk.length == MAX_ARRAY_SIZE) { + if (chunk.length == maxChunkSize) { break; } @@ -229,21 +238,21 @@ final class ChunkedOutputStream extends OutputStream { } } - private static int nextChunkSize(final int currentSize, final int requested) { - return currentSize == MAX_ARRAY_SIZE || requested >= MAX_ARRAY_SIZE - ? MAX_ARRAY_SIZE : Math.max(currentSize * 2, ceilingPowerOfTwo(requested)); + private int nextChunkSize(final int currentSize, final int requested) { + return currentSize == maxChunkSize || requested >= maxChunkSize + ? maxChunkSize : Math.max(currentSize * 2, ceilingPowerOfTwo(requested)); } - private static int nextChunkSize(final int currentSize) { - return currentSize < MAX_ARRAY_SIZE ? currentSize * 2 : MAX_ARRAY_SIZE; + private int nextChunkSize(final int currentSize) { + return currentSize < maxChunkSize ? currentSize * 2 : maxChunkSize; } - private static int initialCapacity(final int requestedSize) { + private static int initialCapacity(final int requestedSize, final int maxChunkSize) { if (requestedSize < MIN_ARRAY_SIZE) { return MIN_ARRAY_SIZE; } - if (requestedSize > MAX_ARRAY_SIZE) { - return MAX_ARRAY_SIZE; + if (requestedSize > maxChunkSize) { + return maxChunkSize; } return ceilingPowerOfTwo(requestedSize); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedOutputStreamTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/io/ChunkedOutputStreamTest.java similarity index 94% rename from opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedOutputStreamTest.java rename to opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/io/ChunkedOutputStreamTest.java index 16f7bd830b..56c78f1c70 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedOutputStreamTest.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/io/ChunkedOutputStreamTest.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.controller.cluster.datastore.persisted; +package org.opendaylight.controller.cluster.io; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -16,8 +16,9 @@ import org.junit.Test; public class ChunkedOutputStreamTest { private static final int INITIAL_SIZE = 256; + private static final int MAX_ARRAY_SIZE = 256 * 1024; - private final ChunkedOutputStream stream = new ChunkedOutputStream(INITIAL_SIZE); + private final ChunkedOutputStream stream = new ChunkedOutputStream(INITIAL_SIZE, MAX_ARRAY_SIZE); @Test public void testBasicWrite() throws IOException { @@ -63,7 +64,7 @@ public class ChunkedOutputStreamTest { @Test public void testTwoChunksWrite() throws IOException { - int size = ChunkedOutputStream.MAX_ARRAY_SIZE + 1; + int size = MAX_ARRAY_SIZE + 1; for (int i = 0; i < size; ++i) { stream.write(i); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java index 65a65ea620..29a328730a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java @@ -8,8 +8,8 @@ package org.opendaylight.controller.cluster.datastore.persisted; import static com.google.common.base.Verify.verifyNotNull; +import static com.google.common.math.IntMath.ceilingPowerOfTwo; import static java.util.Objects.requireNonNull; -import static org.opendaylight.controller.cluster.datastore.persisted.ChunkedOutputStream.MAX_ARRAY_SIZE; import com.google.common.annotations.Beta; import com.google.common.annotations.VisibleForTesting; @@ -29,6 +29,8 @@ import java.util.Map.Entry; import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion; +import org.opendaylight.controller.cluster.io.ChunkedByteArray; +import org.opendaylight.controller.cluster.io.ChunkedOutputStream; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.IdentifiablePayload; import org.opendaylight.yangtools.concepts.Variant; import org.opendaylight.yangtools.yang.data.api.schema.stream.ReusableStreamReceiver; @@ -49,6 +51,9 @@ public abstract class CommitTransactionPayload extends IdentifiablePayload candidate = null; CommitTransactionPayload() { @@ -58,7 +63,7 @@ public abstract class CommitTransactionPayload extends IdentifiablePayload