Rehost Chunked{ByteArray,InputStream,OutputStream}
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / persisted / ChunkedOutputStream.java
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedOutputStream.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedOutputStream.java
deleted file mode 100644 (file)
index 717c3d1..0000000
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.persisted;
-
-import static com.google.common.base.Preconditions.checkState;
-import static com.google.common.base.Verify.verify;
-import static com.google.common.math.IntMath.ceilingPowerOfTwo;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableList.Builder;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayDeque;
-import java.util.Arrays;
-import java.util.Deque;
-import java.util.Iterator;
-import org.opendaylight.yangtools.concepts.Variant;
-
-/**
- * An {@link OutputStream} implementation which collects data is a series of {@code byte[]} chunks, each of which has
- * a fixed maximum size. This is generally preferable to {@link ByteArrayOutputStream}, as that can result in huge
- * byte arrays -- which can create unnecessary pressure on the GC (as well as lot of copying).
- *
- * <p>
- * This class takes a different approach: it recognizes that result of buffering will be collected at some point, when
- * the stream is already closed (and thus unmodifiable). Thus it splits the process into two steps:
- * <ul>
- *   <li>Data acquisition, during which we start with an initial (power-of-two) size and proceed to fill it up. Once the
- *       buffer is full, we stash it, allocate a new buffer twice its size and repeat the process. Once we hit
- *       {@link #MAX_ARRAY_SIZE}, we do not grow subsequent buffer. We also can skip some intermediate sizes if data
- *       is introduced in large chunks via {@link #write(byte[], int, int)}.</li>
- *   <li>Buffer consolidation, which occurs when the stream is {@link #close() closed}. At this point we construct the
- *       final collection of buffers.</li>
- * </ul>
- *
- * <p>
- * The data acquisition strategy results in predictably-sized buffers, which are growing exponentially in size until
- * they hit maximum size. Intrinsic property here is that the total capacity of chunks created during the ramp up is
- * guaranteed to fit into {@code MAX_ARRAY_SIZE}, hence they can readily be compacted into a single buffer, which
- * replaces them. Combined with the requirement to trim the last buffer to have accurate length, this algorithm
- * guarantees total number of internal copy operations is capped at {@code 2 * MAX_ARRAY_SIZE}. The number of produced
- * chunks is also well-controlled:
- * <ul>
- *   <li>for slowly-built data, we will maintain perfect packing</li>
- *   <li>for fast-startup data, we will be at most one one chunk away from packing perfectly</li>
- * </ul>
- *
- * @author Robert Varga
- * @author Tomas Olvecky
- */
-final class ChunkedOutputStream extends OutputStream {
-    static final int MAX_ARRAY_SIZE = ceilingPowerOfTwo(Integer.getInteger(
-        "org.opendaylight.controller.cluster.datastore.persisted.max-array-size", 256 * 1024));
-    private static final int MIN_ARRAY_SIZE = 32;
-
-    // byte[] or a List
-    private Object result;
-    // Lazily-allocated to reduce pressure for single-chunk streams
-    private Deque<byte[]> prevChunks;
-
-    private byte[] currentChunk;
-    private int currentOffset;
-    private int size;
-
-    ChunkedOutputStream(final int requestedInitialCapacity) {
-        currentChunk = new byte[initialCapacity(requestedInitialCapacity)];
-    }
-
-    @Override
-    @SuppressWarnings("checkstyle:ParameterName")
-    public void write(final int b) throws IOException {
-        checkNotClosed();
-        ensureOneByte();
-        currentChunk[currentOffset] = (byte) b;
-        currentOffset++;
-        size++;
-    }
-
-    @Override
-    @SuppressWarnings("checkstyle:ParameterName")
-    public void write(final byte[] b, final int off, final int len) throws IOException {
-        if (len < 0) {
-            throw new IndexOutOfBoundsException();
-        }
-        checkNotClosed();
-
-        int fromOffset = off;
-        int toCopy = len;
-
-        while (toCopy != 0) {
-            final int count = ensureMoreBytes(toCopy);
-            System.arraycopy(b, fromOffset, currentChunk, currentOffset, count);
-            currentOffset += count;
-            size += count;
-            fromOffset += count;
-            toCopy -= count;
-        }
-    }
-
-    @Override
-    public void close() {
-        if (result == null) {
-            result = computeResult();
-            prevChunks = null;
-            currentChunk = null;
-        }
-    }
-
-    int size() {
-        return size;
-    }
-
-    ChunkedByteArray toChunkedByteArray() {
-        checkClosed();
-        return new ChunkedByteArray(size, result instanceof byte[] ? ImmutableList.of((byte[]) result)
-            : (ImmutableList<byte[]>) result);
-    }
-
-    Variant<byte[], ChunkedByteArray> toVariant() {
-        checkClosed();
-        return result instanceof byte[] ? Variant.ofFirst((byte[]) result)
-                : Variant.ofSecond(new ChunkedByteArray(size, (ImmutableList<byte[]>) result));
-    }
-
-    private Object computeResult() {
-        if (prevChunks == null) {
-            // Simple case: it's only the current buffer, return that
-            return trimChunk(currentChunk, currentOffset);
-        }
-        if (size <= MAX_ARRAY_SIZE) {
-            // We have collected less than full chunk of data, let's have just one chunk ...
-            final byte[] singleChunk;
-            if (currentOffset == 0 && prevChunks.size() == 1) {
-                // ... which we have readily available
-                return prevChunks.getFirst();
-            }
-
-            // ... which we need to collect
-            singleChunk = new byte[size];
-            int offset = 0;
-            for (byte[] chunk : prevChunks) {
-                System.arraycopy(chunk, 0, singleChunk, offset, chunk.length);
-                offset += chunk.length;
-            }
-            System.arraycopy(currentChunk, 0, singleChunk, offset, currentOffset);
-            return singleChunk;
-        }
-
-        // Determine number of chunks to aggregate and their required storage. Normally storage would be MAX_ARRAY_SIZE,
-        // but we can have faster-than-exponential startup, which ends up needing less storage -- and we do not want to
-        // end up trimming this array.
-        int headSize = 0;
-        int headCount = 0;
-        final Iterator<byte[]> it = prevChunks.iterator();
-        do {
-            final byte[] chunk = it.next();
-            if (chunk.length == MAX_ARRAY_SIZE) {
-                break;
-            }
-
-            headSize += chunk.length;
-            headCount++;
-        } while (it.hasNext());
-
-        // Compact initial chunks into a single one
-        final byte[] head = new byte[headSize];
-        int offset = 0;
-        for (int i = 0; i < headCount; ++i) {
-            final byte[] chunk = prevChunks.removeFirst();
-            System.arraycopy(chunk, 0, head, offset, chunk.length);
-            offset += chunk.length;
-        }
-        verify(offset == head.length);
-        prevChunks.addFirst(head);
-
-        // Now append the current chunk if need be, potentially trimming it
-        if (currentOffset == 0) {
-            return ImmutableList.copyOf(prevChunks);
-        }
-
-        final Builder<byte[]> builder = ImmutableList.builderWithExpectedSize(prevChunks.size() + 1);
-        builder.addAll(prevChunks);
-        builder.add(trimChunk(currentChunk, currentOffset));
-        return builder.build();
-    }
-
-    // Ensure a single byte
-    private void ensureOneByte() {
-        if (currentChunk.length == currentOffset) {
-            nextChunk(nextChunkSize(currentChunk.length));
-        }
-    }
-
-    // Ensure more than one byte, returns the number of bytes available
-    private int ensureMoreBytes(final int requested) {
-        int available = currentChunk.length - currentOffset;
-        if (available == 0) {
-            nextChunk(nextChunkSize(currentChunk.length, requested));
-            available = currentChunk.length;
-        }
-        final int count = Math.min(requested, available);
-        verify(count > 0);
-        return count;
-    }
-
-    private void nextChunk(final int chunkSize) {
-        if (prevChunks == null) {
-            prevChunks = new ArrayDeque<>();
-        }
-
-        prevChunks.addLast(currentChunk);
-        currentChunk = new byte[chunkSize];
-        currentOffset = 0;
-    }
-
-    private void checkClosed() {
-        checkState(result != null, "Stream has not been closed yet");
-    }
-
-    private void checkNotClosed() throws IOException {
-        if (result != null) {
-            throw new IOException("Stream is already closed");
-        }
-    }
-
-    private static int nextChunkSize(final int currentSize, final int requested) {
-        return currentSize == MAX_ARRAY_SIZE || requested >= MAX_ARRAY_SIZE
-                ? MAX_ARRAY_SIZE : Math.max(currentSize * 2, ceilingPowerOfTwo(requested));
-    }
-
-    private static int nextChunkSize(final int currentSize) {
-        return currentSize < MAX_ARRAY_SIZE ? currentSize * 2 : MAX_ARRAY_SIZE;
-    }
-
-    private static int initialCapacity(final int requestedSize) {
-        if (requestedSize < MIN_ARRAY_SIZE) {
-            return MIN_ARRAY_SIZE;
-        }
-        if (requestedSize > MAX_ARRAY_SIZE) {
-            return MAX_ARRAY_SIZE;
-        }
-        return ceilingPowerOfTwo(requestedSize);
-    }
-
-    private static byte[] trimChunk(final byte[] chunk, final int length) {
-        return chunk.length == length ? chunk : Arrays.copyOf(chunk, length);
-    }
-}