Split up transaction chunks
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / persisted / ChunkedOutputStream.java
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedOutputStream.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedOutputStream.java
new file mode 100644 (file)
index 0000000..717c3d1
--- /dev/null
@@ -0,0 +1,254 @@
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Verify.verify;
+import static com.google.common.math.IntMath.ceilingPowerOfTwo;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableList.Builder;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.Iterator;
+import org.opendaylight.yangtools.concepts.Variant;
+
+/**
+ * An {@link OutputStream} implementation which collects data is a series of {@code byte[]} chunks, each of which has
+ * a fixed maximum size. This is generally preferable to {@link ByteArrayOutputStream}, as that can result in huge
+ * byte arrays -- which can create unnecessary pressure on the GC (as well as lot of copying).
+ *
+ * <p>
+ * This class takes a different approach: it recognizes that result of buffering will be collected at some point, when
+ * the stream is already closed (and thus unmodifiable). Thus it splits the process into two steps:
+ * <ul>
+ *   <li>Data acquisition, during which we start with an initial (power-of-two) size and proceed to fill it up. Once the
+ *       buffer is full, we stash it, allocate a new buffer twice its size and repeat the process. Once we hit
+ *       {@link #MAX_ARRAY_SIZE}, we do not grow subsequent buffer. We also can skip some intermediate sizes if data
+ *       is introduced in large chunks via {@link #write(byte[], int, int)}.</li>
+ *   <li>Buffer consolidation, which occurs when the stream is {@link #close() closed}. At this point we construct the
+ *       final collection of buffers.</li>
+ * </ul>
+ *
+ * <p>
+ * The data acquisition strategy results in predictably-sized buffers, which are growing exponentially in size until
+ * they hit maximum size. Intrinsic property here is that the total capacity of chunks created during the ramp up is
+ * guaranteed to fit into {@code MAX_ARRAY_SIZE}, hence they can readily be compacted into a single buffer, which
+ * replaces them. Combined with the requirement to trim the last buffer to have accurate length, this algorithm
+ * guarantees total number of internal copy operations is capped at {@code 2 * MAX_ARRAY_SIZE}. The number of produced
+ * chunks is also well-controlled:
+ * <ul>
+ *   <li>for slowly-built data, we will maintain perfect packing</li>
+ *   <li>for fast-startup data, we will be at most one one chunk away from packing perfectly</li>
+ * </ul>
+ *
+ * @author Robert Varga
+ * @author Tomas Olvecky
+ */
+final class ChunkedOutputStream extends OutputStream {
+    static final int MAX_ARRAY_SIZE = ceilingPowerOfTwo(Integer.getInteger(
+        "org.opendaylight.controller.cluster.datastore.persisted.max-array-size", 256 * 1024));
+    private static final int MIN_ARRAY_SIZE = 32;
+
+    // byte[] or a List
+    private Object result;
+    // Lazily-allocated to reduce pressure for single-chunk streams
+    private Deque<byte[]> prevChunks;
+
+    private byte[] currentChunk;
+    private int currentOffset;
+    private int size;
+
+    ChunkedOutputStream(final int requestedInitialCapacity) {
+        currentChunk = new byte[initialCapacity(requestedInitialCapacity)];
+    }
+
+    @Override
+    @SuppressWarnings("checkstyle:ParameterName")
+    public void write(final int b) throws IOException {
+        checkNotClosed();
+        ensureOneByte();
+        currentChunk[currentOffset] = (byte) b;
+        currentOffset++;
+        size++;
+    }
+
+    @Override
+    @SuppressWarnings("checkstyle:ParameterName")
+    public void write(final byte[] b, final int off, final int len) throws IOException {
+        if (len < 0) {
+            throw new IndexOutOfBoundsException();
+        }
+        checkNotClosed();
+
+        int fromOffset = off;
+        int toCopy = len;
+
+        while (toCopy != 0) {
+            final int count = ensureMoreBytes(toCopy);
+            System.arraycopy(b, fromOffset, currentChunk, currentOffset, count);
+            currentOffset += count;
+            size += count;
+            fromOffset += count;
+            toCopy -= count;
+        }
+    }
+
+    @Override
+    public void close() {
+        if (result == null) {
+            result = computeResult();
+            prevChunks = null;
+            currentChunk = null;
+        }
+    }
+
+    int size() {
+        return size;
+    }
+
+    ChunkedByteArray toChunkedByteArray() {
+        checkClosed();
+        return new ChunkedByteArray(size, result instanceof byte[] ? ImmutableList.of((byte[]) result)
+            : (ImmutableList<byte[]>) result);
+    }
+
+    Variant<byte[], ChunkedByteArray> toVariant() {
+        checkClosed();
+        return result instanceof byte[] ? Variant.ofFirst((byte[]) result)
+                : Variant.ofSecond(new ChunkedByteArray(size, (ImmutableList<byte[]>) result));
+    }
+
+    private Object computeResult() {
+        if (prevChunks == null) {
+            // Simple case: it's only the current buffer, return that
+            return trimChunk(currentChunk, currentOffset);
+        }
+        if (size <= MAX_ARRAY_SIZE) {
+            // We have collected less than full chunk of data, let's have just one chunk ...
+            final byte[] singleChunk;
+            if (currentOffset == 0 && prevChunks.size() == 1) {
+                // ... which we have readily available
+                return prevChunks.getFirst();
+            }
+
+            // ... which we need to collect
+            singleChunk = new byte[size];
+            int offset = 0;
+            for (byte[] chunk : prevChunks) {
+                System.arraycopy(chunk, 0, singleChunk, offset, chunk.length);
+                offset += chunk.length;
+            }
+            System.arraycopy(currentChunk, 0, singleChunk, offset, currentOffset);
+            return singleChunk;
+        }
+
+        // Determine number of chunks to aggregate and their required storage. Normally storage would be MAX_ARRAY_SIZE,
+        // but we can have faster-than-exponential startup, which ends up needing less storage -- and we do not want to
+        // end up trimming this array.
+        int headSize = 0;
+        int headCount = 0;
+        final Iterator<byte[]> it = prevChunks.iterator();
+        do {
+            final byte[] chunk = it.next();
+            if (chunk.length == MAX_ARRAY_SIZE) {
+                break;
+            }
+
+            headSize += chunk.length;
+            headCount++;
+        } while (it.hasNext());
+
+        // Compact initial chunks into a single one
+        final byte[] head = new byte[headSize];
+        int offset = 0;
+        for (int i = 0; i < headCount; ++i) {
+            final byte[] chunk = prevChunks.removeFirst();
+            System.arraycopy(chunk, 0, head, offset, chunk.length);
+            offset += chunk.length;
+        }
+        verify(offset == head.length);
+        prevChunks.addFirst(head);
+
+        // Now append the current chunk if need be, potentially trimming it
+        if (currentOffset == 0) {
+            return ImmutableList.copyOf(prevChunks);
+        }
+
+        final Builder<byte[]> builder = ImmutableList.builderWithExpectedSize(prevChunks.size() + 1);
+        builder.addAll(prevChunks);
+        builder.add(trimChunk(currentChunk, currentOffset));
+        return builder.build();
+    }
+
+    // Ensure a single byte
+    private void ensureOneByte() {
+        if (currentChunk.length == currentOffset) {
+            nextChunk(nextChunkSize(currentChunk.length));
+        }
+    }
+
+    // Ensure more than one byte, returns the number of bytes available
+    private int ensureMoreBytes(final int requested) {
+        int available = currentChunk.length - currentOffset;
+        if (available == 0) {
+            nextChunk(nextChunkSize(currentChunk.length, requested));
+            available = currentChunk.length;
+        }
+        final int count = Math.min(requested, available);
+        verify(count > 0);
+        return count;
+    }
+
+    private void nextChunk(final int chunkSize) {
+        if (prevChunks == null) {
+            prevChunks = new ArrayDeque<>();
+        }
+
+        prevChunks.addLast(currentChunk);
+        currentChunk = new byte[chunkSize];
+        currentOffset = 0;
+    }
+
+    private void checkClosed() {
+        checkState(result != null, "Stream has not been closed yet");
+    }
+
+    private void checkNotClosed() throws IOException {
+        if (result != null) {
+            throw new IOException("Stream is already closed");
+        }
+    }
+
+    private static int nextChunkSize(final int currentSize, final int requested) {
+        return currentSize == MAX_ARRAY_SIZE || requested >= MAX_ARRAY_SIZE
+                ? MAX_ARRAY_SIZE : Math.max(currentSize * 2, ceilingPowerOfTwo(requested));
+    }
+
+    private static int nextChunkSize(final int currentSize) {
+        return currentSize < MAX_ARRAY_SIZE ? currentSize * 2 : MAX_ARRAY_SIZE;
+    }
+
+    private static int initialCapacity(final int requestedSize) {
+        if (requestedSize < MIN_ARRAY_SIZE) {
+            return MIN_ARRAY_SIZE;
+        }
+        if (requestedSize > MAX_ARRAY_SIZE) {
+            return MAX_ARRAY_SIZE;
+        }
+        return ceilingPowerOfTwo(requestedSize);
+    }
+
+    private static byte[] trimChunk(final byte[] chunk, final int length) {
+        return chunk.length == length ? chunk : Arrays.copyOf(chunk, length);
+    }
+}