X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-clustering-commons%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fio%2FChunkedOutputStream.java;fp=opendaylight%2Fmd-sal%2Fsal-clustering-commons%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fio%2FChunkedOutputStream.java;h=8215e23ff371d290846a3e2d75528847219c2e0d;hb=88d921169a0ccd41339e5409bbe8e7db18597609;hp=0000000000000000000000000000000000000000;hpb=255e74efd633f2fbca7ce4f1372004d93cc81a10;p=controller.git diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/ChunkedOutputStream.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/ChunkedOutputStream.java new file mode 100644 index 0000000000..8215e23ff3 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/io/ChunkedOutputStream.java @@ -0,0 +1,263 @@ +/* + * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.io; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; +import static com.google.common.math.IntMath.ceilingPowerOfTwo; +import static com.google.common.math.IntMath.isPowerOfTwo; + +import com.google.common.annotations.Beta; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableList.Builder; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Deque; +import java.util.Iterator; +import org.opendaylight.yangtools.concepts.Variant; + +/** + * An {@link OutputStream} implementation which collects data is a series of {@code byte[]} chunks, each of which has + * a fixed maximum size. This is generally preferable to {@link ByteArrayOutputStream}, as that can result in huge + * byte arrays -- which can create unnecessary pressure on the GC (as well as lot of copying). + * + *

+ * This class takes a different approach: it recognizes that result of buffering will be collected at some point, when + * the stream is already closed (and thus unmodifiable). Thus it splits the process into two steps: + *

+ * + *

+ * The data acquisition strategy results in predictably-sized buffers, which are growing exponentially in size until + * they hit maximum size. Intrinsic property here is that the total capacity of chunks created during the ramp up is + * guaranteed to fit into {@code maxChunkSize}, hence they can readily be compacted into a single buffer, which replaces + * them. Combined with the requirement to trim the last buffer to have accurate length, this algorithm guarantees total + * number of internal copy operations is capped at {@code 2 * maxChunkSize}. The number of produced chunks is also + * well-controlled: + *

+ * + * @author Robert Varga + * @author Tomas Olvecky + */ +@Beta +public final class ChunkedOutputStream extends OutputStream { + private static final int MIN_ARRAY_SIZE = 32; + + private final int maxChunkSize; + + // byte[] or a List + private Object result; + // Lazily-allocated to reduce pressure for single-chunk streams + private Deque prevChunks; + + private byte[] currentChunk; + private int currentOffset; + private int size; + + public ChunkedOutputStream(final int requestedInitialCapacity, final int maxChunkSize) { + checkArgument(isPowerOfTwo(maxChunkSize), "Maximum chunk size %s is not a power of two", maxChunkSize); + checkArgument(maxChunkSize > 0, "Maximum chunk size %s is not positive", maxChunkSize); + this.maxChunkSize = maxChunkSize; + currentChunk = new byte[initialCapacity(requestedInitialCapacity, maxChunkSize)]; + } + + @Override + @SuppressWarnings("checkstyle:ParameterName") + public void write(final int b) throws IOException { + checkNotClosed(); + ensureOneByte(); + currentChunk[currentOffset] = (byte) b; + currentOffset++; + size++; + } + + @Override + @SuppressWarnings("checkstyle:ParameterName") + public void write(final byte[] b, final int off, final int len) throws IOException { + if (len < 0) { + throw new IndexOutOfBoundsException(); + } + checkNotClosed(); + + int fromOffset = off; + int toCopy = len; + + while (toCopy != 0) { + final int count = ensureMoreBytes(toCopy); + System.arraycopy(b, fromOffset, currentChunk, currentOffset, count); + currentOffset += count; + size += count; + fromOffset += count; + toCopy -= count; + } + } + + @Override + public void close() { + if (result == null) { + result = computeResult(); + prevChunks = null; + currentChunk = null; + } + } + + public int size() { + return size; + } + + public Variant toVariant() { + checkClosed(); + return result instanceof byte[] ? Variant.ofFirst((byte[]) result) + : Variant.ofSecond(new ChunkedByteArray(size, (ImmutableList) result)); + } + + @VisibleForTesting + ChunkedByteArray toChunkedByteArray() { + checkClosed(); + return new ChunkedByteArray(size, result instanceof byte[] ? ImmutableList.of((byte[]) result) + : (ImmutableList) result); + } + + private Object computeResult() { + if (prevChunks == null) { + // Simple case: it's only the current buffer, return that + return trimChunk(currentChunk, currentOffset); + } + if (size <= maxChunkSize) { + // We have collected less than full chunk of data, let's have just one chunk ... + final byte[] singleChunk; + if (currentOffset == 0 && prevChunks.size() == 1) { + // ... which we have readily available + return prevChunks.getFirst(); + } + + // ... which we need to collect + singleChunk = new byte[size]; + int offset = 0; + for (byte[] chunk : prevChunks) { + System.arraycopy(chunk, 0, singleChunk, offset, chunk.length); + offset += chunk.length; + } + System.arraycopy(currentChunk, 0, singleChunk, offset, currentOffset); + return singleChunk; + } + + // Determine number of chunks to aggregate and their required storage. Normally storage would be MAX_ARRAY_SIZE, + // but we can have faster-than-exponential startup, which ends up needing less storage -- and we do not want to + // end up trimming this array. + int headSize = 0; + int headCount = 0; + final Iterator it = prevChunks.iterator(); + do { + final byte[] chunk = it.next(); + if (chunk.length == maxChunkSize) { + break; + } + + headSize += chunk.length; + headCount++; + } while (it.hasNext()); + + // Compact initial chunks into a single one + final byte[] head = new byte[headSize]; + int offset = 0; + for (int i = 0; i < headCount; ++i) { + final byte[] chunk = prevChunks.removeFirst(); + System.arraycopy(chunk, 0, head, offset, chunk.length); + offset += chunk.length; + } + verify(offset == head.length); + prevChunks.addFirst(head); + + // Now append the current chunk if need be, potentially trimming it + if (currentOffset == 0) { + return ImmutableList.copyOf(prevChunks); + } + + final Builder builder = ImmutableList.builderWithExpectedSize(prevChunks.size() + 1); + builder.addAll(prevChunks); + builder.add(trimChunk(currentChunk, currentOffset)); + return builder.build(); + } + + // Ensure a single byte + private void ensureOneByte() { + if (currentChunk.length == currentOffset) { + nextChunk(nextChunkSize(currentChunk.length)); + } + } + + // Ensure more than one byte, returns the number of bytes available + private int ensureMoreBytes(final int requested) { + int available = currentChunk.length - currentOffset; + if (available == 0) { + nextChunk(nextChunkSize(currentChunk.length, requested)); + available = currentChunk.length; + } + final int count = Math.min(requested, available); + verify(count > 0); + return count; + } + + private void nextChunk(final int chunkSize) { + if (prevChunks == null) { + prevChunks = new ArrayDeque<>(); + } + + prevChunks.addLast(currentChunk); + currentChunk = new byte[chunkSize]; + currentOffset = 0; + } + + private void checkClosed() { + checkState(result != null, "Stream has not been closed yet"); + } + + private void checkNotClosed() throws IOException { + if (result != null) { + throw new IOException("Stream is already closed"); + } + } + + private int nextChunkSize(final int currentSize, final int requested) { + return currentSize == maxChunkSize || requested >= maxChunkSize + ? maxChunkSize : Math.max(currentSize * 2, ceilingPowerOfTwo(requested)); + } + + private int nextChunkSize(final int currentSize) { + return currentSize < maxChunkSize ? currentSize * 2 : maxChunkSize; + } + + private static int initialCapacity(final int requestedSize, final int maxChunkSize) { + if (requestedSize < MIN_ARRAY_SIZE) { + return MIN_ARRAY_SIZE; + } + if (requestedSize > maxChunkSize) { + return maxChunkSize; + } + return ceilingPowerOfTwo(requestedSize); + } + + private static byte[] trimChunk(final byte[] chunk, final int length) { + return chunk.length == length ? chunk : Arrays.copyOf(chunk, length); + } +}