/* * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.cluster.datastore.persisted; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import static com.google.common.math.IntMath.ceilingPowerOfTwo; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList.Builder; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayDeque; import java.util.Arrays; import java.util.Deque; import java.util.Iterator; import org.opendaylight.yangtools.concepts.Variant; /** * An {@link OutputStream} implementation which collects data is a series of {@code byte[]} chunks, each of which has * a fixed maximum size. This is generally preferable to {@link ByteArrayOutputStream}, as that can result in huge * byte arrays -- which can create unnecessary pressure on the GC (as well as lot of copying). * *

* This class takes a different approach: it recognizes that result of buffering will be collected at some point, when * the stream is already closed (and thus unmodifiable). Thus it splits the process into two steps: *

* *

* The data acquisition strategy results in predictably-sized buffers, which are growing exponentially in size until * they hit maximum size. Intrinsic property here is that the total capacity of chunks created during the ramp up is * guaranteed to fit into {@code MAX_ARRAY_SIZE}, hence they can readily be compacted into a single buffer, which * replaces them. Combined with the requirement to trim the last buffer to have accurate length, this algorithm * guarantees total number of internal copy operations is capped at {@code 2 * MAX_ARRAY_SIZE}. The number of produced * chunks is also well-controlled: *

* * @author Robert Varga * @author Tomas Olvecky */ final class ChunkedOutputStream extends OutputStream { static final int MAX_ARRAY_SIZE = ceilingPowerOfTwo(Integer.getInteger( "org.opendaylight.controller.cluster.datastore.persisted.max-array-size", 256 * 1024)); private static final int MIN_ARRAY_SIZE = 32; // byte[] or a List private Object result; // Lazily-allocated to reduce pressure for single-chunk streams private Deque prevChunks; private byte[] currentChunk; private int currentOffset; private int size; ChunkedOutputStream(final int requestedInitialCapacity) { currentChunk = new byte[initialCapacity(requestedInitialCapacity)]; } @Override @SuppressWarnings("checkstyle:ParameterName") public void write(final int b) throws IOException { checkNotClosed(); ensureOneByte(); currentChunk[currentOffset] = (byte) b; currentOffset++; size++; } @Override @SuppressWarnings("checkstyle:ParameterName") public void write(final byte[] b, final int off, final int len) throws IOException { if (len < 0) { throw new IndexOutOfBoundsException(); } checkNotClosed(); int fromOffset = off; int toCopy = len; while (toCopy != 0) { final int count = ensureMoreBytes(toCopy); System.arraycopy(b, fromOffset, currentChunk, currentOffset, count); currentOffset += count; size += count; fromOffset += count; toCopy -= count; } } @Override public void close() { if (result == null) { result = computeResult(); prevChunks = null; currentChunk = null; } } int size() { return size; } ChunkedByteArray toChunkedByteArray() { checkClosed(); return new ChunkedByteArray(size, result instanceof byte[] ? ImmutableList.of((byte[]) result) : (ImmutableList) result); } Variant toVariant() { checkClosed(); return result instanceof byte[] ? Variant.ofFirst((byte[]) result) : Variant.ofSecond(new ChunkedByteArray(size, (ImmutableList) result)); } private Object computeResult() { if (prevChunks == null) { // Simple case: it's only the current buffer, return that return trimChunk(currentChunk, currentOffset); } if (size <= MAX_ARRAY_SIZE) { // We have collected less than full chunk of data, let's have just one chunk ... final byte[] singleChunk; if (currentOffset == 0 && prevChunks.size() == 1) { // ... which we have readily available return prevChunks.getFirst(); } // ... which we need to collect singleChunk = new byte[size]; int offset = 0; for (byte[] chunk : prevChunks) { System.arraycopy(chunk, 0, singleChunk, offset, chunk.length); offset += chunk.length; } System.arraycopy(currentChunk, 0, singleChunk, offset, currentOffset); return singleChunk; } // Determine number of chunks to aggregate and their required storage. Normally storage would be MAX_ARRAY_SIZE, // but we can have faster-than-exponential startup, which ends up needing less storage -- and we do not want to // end up trimming this array. int headSize = 0; int headCount = 0; final Iterator it = prevChunks.iterator(); do { final byte[] chunk = it.next(); if (chunk.length == MAX_ARRAY_SIZE) { break; } headSize += chunk.length; headCount++; } while (it.hasNext()); // Compact initial chunks into a single one final byte[] head = new byte[headSize]; int offset = 0; for (int i = 0; i < headCount; ++i) { final byte[] chunk = prevChunks.removeFirst(); System.arraycopy(chunk, 0, head, offset, chunk.length); offset += chunk.length; } verify(offset == head.length); prevChunks.addFirst(head); // Now append the current chunk if need be, potentially trimming it if (currentOffset == 0) { return ImmutableList.copyOf(prevChunks); } final Builder builder = ImmutableList.builderWithExpectedSize(prevChunks.size() + 1); builder.addAll(prevChunks); builder.add(trimChunk(currentChunk, currentOffset)); return builder.build(); } // Ensure a single byte private void ensureOneByte() { if (currentChunk.length == currentOffset) { nextChunk(nextChunkSize(currentChunk.length)); } } // Ensure more than one byte, returns the number of bytes available private int ensureMoreBytes(final int requested) { int available = currentChunk.length - currentOffset; if (available == 0) { nextChunk(nextChunkSize(currentChunk.length, requested)); available = currentChunk.length; } final int count = Math.min(requested, available); verify(count > 0); return count; } private void nextChunk(final int chunkSize) { if (prevChunks == null) { prevChunks = new ArrayDeque<>(); } prevChunks.addLast(currentChunk); currentChunk = new byte[chunkSize]; currentOffset = 0; } private void checkClosed() { checkState(result != null, "Stream has not been closed yet"); } private void checkNotClosed() throws IOException { if (result != null) { throw new IOException("Stream is already closed"); } } private static int nextChunkSize(final int currentSize, final int requested) { return currentSize == MAX_ARRAY_SIZE || requested >= MAX_ARRAY_SIZE ? MAX_ARRAY_SIZE : Math.max(currentSize * 2, ceilingPowerOfTwo(requested)); } private static int nextChunkSize(final int currentSize) { return currentSize < MAX_ARRAY_SIZE ? currentSize * 2 : MAX_ARRAY_SIZE; } private static int initialCapacity(final int requestedSize) { if (requestedSize < MIN_ARRAY_SIZE) { return MIN_ARRAY_SIZE; } if (requestedSize > MAX_ARRAY_SIZE) { return MAX_ARRAY_SIZE; } return ceilingPowerOfTwo(requestedSize); } private static byte[] trimChunk(final byte[] chunk, final int length) { return chunk.length == length ? chunk : Arrays.copyOf(chunk, length); } }