/* * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.cluster.io; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import static com.google.common.math.IntMath.ceilingPowerOfTwo; import static com.google.common.math.IntMath.isPowerOfTwo; import com.google.common.annotations.Beta; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList.Builder; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayDeque; import java.util.Arrays; import java.util.Deque; import java.util.Iterator; import org.opendaylight.yangtools.concepts.Variant; /** * An {@link OutputStream} implementation which collects data is a series of {@code byte[]} chunks, each of which has * a fixed maximum size. This is generally preferable to {@link ByteArrayOutputStream}, as that can result in huge * byte arrays -- which can create unnecessary pressure on the GC (as well as lot of copying). * *

* This class takes a different approach: it recognizes that result of buffering will be collected at some point, when * the stream is already closed (and thus unmodifiable). Thus it splits the process into two steps: *

* *

* The data acquisition strategy results in predictably-sized buffers, which are growing exponentially in size until * they hit maximum size. Intrinsic property here is that the total capacity of chunks created during the ramp up is * guaranteed to fit into {@code maxChunkSize}, hence they can readily be compacted into a single buffer, which replaces * them. Combined with the requirement to trim the last buffer to have accurate length, this algorithm guarantees total * number of internal copy operations is capped at {@code 2 * maxChunkSize}. The number of produced chunks is also * well-controlled: *

* * @author Robert Varga * @author Tomas Olvecky */ @Beta public final class ChunkedOutputStream extends OutputStream { private static final int MIN_ARRAY_SIZE = 32; private final int maxChunkSize; // byte[] or a List private Object result; // Lazily-allocated to reduce pressure for single-chunk streams private Deque prevChunks; private byte[] currentChunk; private int currentOffset; private int size; public ChunkedOutputStream(final int requestedInitialCapacity, final int maxChunkSize) { checkArgument(isPowerOfTwo(maxChunkSize), "Maximum chunk size %s is not a power of two", maxChunkSize); checkArgument(maxChunkSize > 0, "Maximum chunk size %s is not positive", maxChunkSize); this.maxChunkSize = maxChunkSize; currentChunk = new byte[initialCapacity(requestedInitialCapacity, maxChunkSize)]; } @Override @SuppressWarnings("checkstyle:ParameterName") public void write(final int b) throws IOException { checkNotClosed(); ensureOneByte(); currentChunk[currentOffset] = (byte) b; currentOffset++; size++; } @Override @SuppressWarnings("checkstyle:ParameterName") public void write(final byte[] b, final int off, final int len) throws IOException { if (len < 0) { throw new IndexOutOfBoundsException(); } checkNotClosed(); int fromOffset = off; int toCopy = len; while (toCopy != 0) { final int count = ensureMoreBytes(toCopy); System.arraycopy(b, fromOffset, currentChunk, currentOffset, count); currentOffset += count; size += count; fromOffset += count; toCopy -= count; } } @Override public void close() { if (result == null) { result = computeResult(); prevChunks = null; currentChunk = null; } } public int size() { return size; } public Variant toVariant() { checkClosed(); return result instanceof byte[] ? Variant.ofFirst((byte[]) result) : Variant.ofSecond(new ChunkedByteArray(size, (ImmutableList) result)); } @VisibleForTesting ChunkedByteArray toChunkedByteArray() { checkClosed(); return new ChunkedByteArray(size, result instanceof byte[] ? ImmutableList.of((byte[]) result) : (ImmutableList) result); } private Object computeResult() { if (prevChunks == null) { // Simple case: it's only the current buffer, return that return trimChunk(currentChunk, currentOffset); } if (size <= maxChunkSize) { // We have collected less than full chunk of data, let's have just one chunk ... final byte[] singleChunk; if (currentOffset == 0 && prevChunks.size() == 1) { // ... which we have readily available return prevChunks.getFirst(); } // ... which we need to collect singleChunk = new byte[size]; int offset = 0; for (byte[] chunk : prevChunks) { System.arraycopy(chunk, 0, singleChunk, offset, chunk.length); offset += chunk.length; } System.arraycopy(currentChunk, 0, singleChunk, offset, currentOffset); return singleChunk; } // Determine number of chunks to aggregate and their required storage. Normally storage would be MAX_ARRAY_SIZE, // but we can have faster-than-exponential startup, which ends up needing less storage -- and we do not want to // end up trimming this array. int headSize = 0; int headCount = 0; final Iterator it = prevChunks.iterator(); do { final byte[] chunk = it.next(); if (chunk.length == maxChunkSize) { break; } headSize += chunk.length; headCount++; } while (it.hasNext()); // Compact initial chunks into a single one final byte[] head = new byte[headSize]; int offset = 0; for (int i = 0; i < headCount; ++i) { final byte[] chunk = prevChunks.removeFirst(); System.arraycopy(chunk, 0, head, offset, chunk.length); offset += chunk.length; } verify(offset == head.length); prevChunks.addFirst(head); // Now append the current chunk if need be, potentially trimming it if (currentOffset == 0) { return ImmutableList.copyOf(prevChunks); } final Builder builder = ImmutableList.builderWithExpectedSize(prevChunks.size() + 1); builder.addAll(prevChunks); builder.add(trimChunk(currentChunk, currentOffset)); return builder.build(); } // Ensure a single byte private void ensureOneByte() { if (currentChunk.length == currentOffset) { nextChunk(nextChunkSize(currentChunk.length)); } } // Ensure more than one byte, returns the number of bytes available private int ensureMoreBytes(final int requested) { int available = currentChunk.length - currentOffset; if (available == 0) { nextChunk(nextChunkSize(currentChunk.length, requested)); available = currentChunk.length; } final int count = Math.min(requested, available); verify(count > 0); return count; } private void nextChunk(final int chunkSize) { if (prevChunks == null) { prevChunks = new ArrayDeque<>(); } prevChunks.addLast(currentChunk); currentChunk = new byte[chunkSize]; currentOffset = 0; } private void checkClosed() { checkState(result != null, "Stream has not been closed yet"); } private void checkNotClosed() throws IOException { if (result != null) { throw new IOException("Stream is already closed"); } } private int nextChunkSize(final int currentSize, final int requested) { return currentSize == maxChunkSize || requested >= maxChunkSize ? maxChunkSize : Math.max(currentSize * 2, ceilingPowerOfTwo(requested)); } private int nextChunkSize(final int currentSize) { return currentSize < maxChunkSize ? currentSize * 2 : maxChunkSize; } private static int initialCapacity(final int requestedSize, final int maxChunkSize) { if (requestedSize < MIN_ARRAY_SIZE) { return MIN_ARRAY_SIZE; } if (requestedSize > maxChunkSize) { return maxChunkSize; } return ceilingPowerOfTwo(requestedSize); } private static byte[] trimChunk(final byte[] chunk, final int length) { return chunk.length == length ? chunk : Arrays.copyOf(chunk, length); } }