Split up transaction chunks 13/85013/1
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 19 Sep 2019 23:50:56 +0000 (01:50 +0200)
committerRobert Varga <nite@hq.sk>
Wed, 9 Oct 2019 10:06:11 +0000 (10:06 +0000)
This adds the infrastructure to prevent allocation of large byte[]s,
as when those exceed 0.5-16MiB, under G1GC they end up in humongous
object region. Not only that, but after some cut-off point, the
copying of arrays starts to dominate performance.

What we do here is ensure we always receive up to a configurable
number of bytes, defaults to 256KiB, and keep those chunks in a list.
This way we may end up with larger overhead, but that really is
neglibeble -- even a 2GiB payload would end up using only about
8K arrays.

While the input/output streams are similar to
org.apache.commons.io.output.ByteArrayOutputStream, the design here
is geared towards having the intermediate representation available
as well as devolving to a single byte[] for memory efficiency
reasons.

JIRA: CONTROLLER-1920
Change-Id: I2b79a633ebf4fdf8d68d2accc644326e30b41f22
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedByteArray.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedInputStream.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedOutputStream.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedOutputStreamTest.java [new file with mode: 0644]

diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedByteArray.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedByteArray.java
new file mode 100644 (file)
index 0000000..73a7df9
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.util.ArrayList;
+import java.util.List;
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.opendaylight.yangtools.concepts.Immutable;
+
+@NonNullByDefault
+final class ChunkedByteArray implements Immutable {
+    private final ImmutableList<byte[]> chunks;
+    private final int size;
+
+    ChunkedByteArray(final int size, final ImmutableList<byte[]> chunks) {
+        this.size = size;
+        this.chunks = requireNonNull(chunks);
+    }
+
+    static ChunkedByteArray readFrom(final ObjectInput in, final int size, final int chunkSize)
+            throws IOException {
+        final List<byte[]> chunks = new ArrayList<>(requiredChunks(size, chunkSize));
+        int remaining = size;
+        do {
+            final byte[] buffer = new byte[Math.min(remaining, chunkSize)];
+            in.readFully(buffer);
+            chunks.add(buffer);
+            remaining -= buffer.length;
+        } while (remaining != 0);
+
+        return new ChunkedByteArray(size, ImmutableList.copyOf(chunks));
+    }
+
+    int size() {
+        return size;
+    }
+
+    ChunkedInputStream openStream() {
+        return new ChunkedInputStream(size, chunks.iterator());
+    }
+
+    void copyTo(final DataOutput output) throws IOException {
+        for (byte[] chunk : chunks) {
+            output.write(chunk, 0, chunk.length);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this).add("size", size).add("chunkCount", chunks.size()).toString();
+    }
+
+    ImmutableList<byte[]> getChunks() {
+        return chunks;
+    }
+
+    private static int requiredChunks(final int size, final int chunkSize) {
+        final int div = size / chunkSize;
+        return size % chunkSize == 0 ? div : div + 1;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedInputStream.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedInputStream.java
new file mode 100644 (file)
index 0000000..68a906e
--- /dev/null
@@ -0,0 +1,111 @@
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.InputStream;
+import java.util.Iterator;
+
+final class ChunkedInputStream extends InputStream {
+    private final Iterator<byte[]> remainingChunks;
+
+    private byte[] currentChunk;
+    private int currentLimit;
+    private int currentOffset;
+    private int available;
+
+    ChunkedInputStream(final int size, final Iterator<byte[]> iterator) {
+        remainingChunks = requireNonNull(iterator);
+        currentChunk = remainingChunks.next();
+        currentLimit = currentChunk.length;
+        available = size;
+    }
+
+    @Override
+    public int available() {
+        return available;
+    }
+
+    @Override
+    public int read() {
+        if (currentChunk == null) {
+            return -1;
+        }
+
+        int ret = currentChunk[currentOffset] & 0xff;
+        consumeBytes(1);
+        return ret;
+    }
+
+    @Override
+    @SuppressWarnings("checkstyle:ParameterName")
+    public int read(final byte[] b) {
+        return read(b, 0, b.length);
+    }
+
+    @Override
+    @SuppressWarnings("checkstyle:ParameterName")
+    public int read(final byte[] b, final int off, final int len) {
+        if (len < 0) {
+            throw new IndexOutOfBoundsException();
+        }
+        if (currentChunk == null) {
+            return -1;
+        }
+
+        final int result = Math.min(available, len);
+        int toOffset = off;
+        int toCopy = result;
+
+        while (toCopy != 0) {
+            final int count = currentBytes(toCopy);
+            System.arraycopy(currentChunk, currentOffset, b, toOffset, count);
+            consumeBytes(count);
+            toOffset += count;
+            toCopy -= count;
+        }
+
+        return result;
+    }
+
+    @Override
+    @SuppressWarnings("checkstyle:ParameterName")
+    public long skip(final long n) {
+        final int result = (int) Math.min(available, n);
+
+        int toSkip = result;
+        while (toSkip != 0) {
+            final int count = currentBytes(toSkip);
+            consumeBytes(count);
+            toSkip -= count;
+        }
+
+        return result;
+    }
+
+    private int currentBytes(final int desired) {
+        return Math.min(desired, currentLimit - currentOffset);
+    }
+
+    private void consumeBytes(final int count) {
+        currentOffset += count;
+        available -= count;
+
+        if (currentOffset == currentLimit) {
+            if (remainingChunks.hasNext()) {
+                currentChunk = remainingChunks.next();
+                currentLimit = currentChunk.length;
+            } else {
+                currentChunk = null;
+                currentLimit = 0;
+            }
+            currentOffset = 0;
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedOutputStream.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedOutputStream.java
new file mode 100644 (file)
index 0000000..717c3d1
--- /dev/null
@@ -0,0 +1,254 @@
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Verify.verify;
+import static com.google.common.math.IntMath.ceilingPowerOfTwo;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableList.Builder;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.Iterator;
+import org.opendaylight.yangtools.concepts.Variant;
+
+/**
+ * An {@link OutputStream} implementation which collects data is a series of {@code byte[]} chunks, each of which has
+ * a fixed maximum size. This is generally preferable to {@link ByteArrayOutputStream}, as that can result in huge
+ * byte arrays -- which can create unnecessary pressure on the GC (as well as lot of copying).
+ *
+ * <p>
+ * This class takes a different approach: it recognizes that result of buffering will be collected at some point, when
+ * the stream is already closed (and thus unmodifiable). Thus it splits the process into two steps:
+ * <ul>
+ *   <li>Data acquisition, during which we start with an initial (power-of-two) size and proceed to fill it up. Once the
+ *       buffer is full, we stash it, allocate a new buffer twice its size and repeat the process. Once we hit
+ *       {@link #MAX_ARRAY_SIZE}, we do not grow subsequent buffer. We also can skip some intermediate sizes if data
+ *       is introduced in large chunks via {@link #write(byte[], int, int)}.</li>
+ *   <li>Buffer consolidation, which occurs when the stream is {@link #close() closed}. At this point we construct the
+ *       final collection of buffers.</li>
+ * </ul>
+ *
+ * <p>
+ * The data acquisition strategy results in predictably-sized buffers, which are growing exponentially in size until
+ * they hit maximum size. Intrinsic property here is that the total capacity of chunks created during the ramp up is
+ * guaranteed to fit into {@code MAX_ARRAY_SIZE}, hence they can readily be compacted into a single buffer, which
+ * replaces them. Combined with the requirement to trim the last buffer to have accurate length, this algorithm
+ * guarantees total number of internal copy operations is capped at {@code 2 * MAX_ARRAY_SIZE}. The number of produced
+ * chunks is also well-controlled:
+ * <ul>
+ *   <li>for slowly-built data, we will maintain perfect packing</li>
+ *   <li>for fast-startup data, we will be at most one one chunk away from packing perfectly</li>
+ * </ul>
+ *
+ * @author Robert Varga
+ * @author Tomas Olvecky
+ */
+final class ChunkedOutputStream extends OutputStream {
+    static final int MAX_ARRAY_SIZE = ceilingPowerOfTwo(Integer.getInteger(
+        "org.opendaylight.controller.cluster.datastore.persisted.max-array-size", 256 * 1024));
+    private static final int MIN_ARRAY_SIZE = 32;
+
+    // byte[] or a List
+    private Object result;
+    // Lazily-allocated to reduce pressure for single-chunk streams
+    private Deque<byte[]> prevChunks;
+
+    private byte[] currentChunk;
+    private int currentOffset;
+    private int size;
+
+    ChunkedOutputStream(final int requestedInitialCapacity) {
+        currentChunk = new byte[initialCapacity(requestedInitialCapacity)];
+    }
+
+    @Override
+    @SuppressWarnings("checkstyle:ParameterName")
+    public void write(final int b) throws IOException {
+        checkNotClosed();
+        ensureOneByte();
+        currentChunk[currentOffset] = (byte) b;
+        currentOffset++;
+        size++;
+    }
+
+    @Override
+    @SuppressWarnings("checkstyle:ParameterName")
+    public void write(final byte[] b, final int off, final int len) throws IOException {
+        if (len < 0) {
+            throw new IndexOutOfBoundsException();
+        }
+        checkNotClosed();
+
+        int fromOffset = off;
+        int toCopy = len;
+
+        while (toCopy != 0) {
+            final int count = ensureMoreBytes(toCopy);
+            System.arraycopy(b, fromOffset, currentChunk, currentOffset, count);
+            currentOffset += count;
+            size += count;
+            fromOffset += count;
+            toCopy -= count;
+        }
+    }
+
+    @Override
+    public void close() {
+        if (result == null) {
+            result = computeResult();
+            prevChunks = null;
+            currentChunk = null;
+        }
+    }
+
+    int size() {
+        return size;
+    }
+
+    ChunkedByteArray toChunkedByteArray() {
+        checkClosed();
+        return new ChunkedByteArray(size, result instanceof byte[] ? ImmutableList.of((byte[]) result)
+            : (ImmutableList<byte[]>) result);
+    }
+
+    Variant<byte[], ChunkedByteArray> toVariant() {
+        checkClosed();
+        return result instanceof byte[] ? Variant.ofFirst((byte[]) result)
+                : Variant.ofSecond(new ChunkedByteArray(size, (ImmutableList<byte[]>) result));
+    }
+
+    private Object computeResult() {
+        if (prevChunks == null) {
+            // Simple case: it's only the current buffer, return that
+            return trimChunk(currentChunk, currentOffset);
+        }
+        if (size <= MAX_ARRAY_SIZE) {
+            // We have collected less than full chunk of data, let's have just one chunk ...
+            final byte[] singleChunk;
+            if (currentOffset == 0 && prevChunks.size() == 1) {
+                // ... which we have readily available
+                return prevChunks.getFirst();
+            }
+
+            // ... which we need to collect
+            singleChunk = new byte[size];
+            int offset = 0;
+            for (byte[] chunk : prevChunks) {
+                System.arraycopy(chunk, 0, singleChunk, offset, chunk.length);
+                offset += chunk.length;
+            }
+            System.arraycopy(currentChunk, 0, singleChunk, offset, currentOffset);
+            return singleChunk;
+        }
+
+        // Determine number of chunks to aggregate and their required storage. Normally storage would be MAX_ARRAY_SIZE,
+        // but we can have faster-than-exponential startup, which ends up needing less storage -- and we do not want to
+        // end up trimming this array.
+        int headSize = 0;
+        int headCount = 0;
+        final Iterator<byte[]> it = prevChunks.iterator();
+        do {
+            final byte[] chunk = it.next();
+            if (chunk.length == MAX_ARRAY_SIZE) {
+                break;
+            }
+
+            headSize += chunk.length;
+            headCount++;
+        } while (it.hasNext());
+
+        // Compact initial chunks into a single one
+        final byte[] head = new byte[headSize];
+        int offset = 0;
+        for (int i = 0; i < headCount; ++i) {
+            final byte[] chunk = prevChunks.removeFirst();
+            System.arraycopy(chunk, 0, head, offset, chunk.length);
+            offset += chunk.length;
+        }
+        verify(offset == head.length);
+        prevChunks.addFirst(head);
+
+        // Now append the current chunk if need be, potentially trimming it
+        if (currentOffset == 0) {
+            return ImmutableList.copyOf(prevChunks);
+        }
+
+        final Builder<byte[]> builder = ImmutableList.builderWithExpectedSize(prevChunks.size() + 1);
+        builder.addAll(prevChunks);
+        builder.add(trimChunk(currentChunk, currentOffset));
+        return builder.build();
+    }
+
+    // Ensure a single byte
+    private void ensureOneByte() {
+        if (currentChunk.length == currentOffset) {
+            nextChunk(nextChunkSize(currentChunk.length));
+        }
+    }
+
+    // Ensure more than one byte, returns the number of bytes available
+    private int ensureMoreBytes(final int requested) {
+        int available = currentChunk.length - currentOffset;
+        if (available == 0) {
+            nextChunk(nextChunkSize(currentChunk.length, requested));
+            available = currentChunk.length;
+        }
+        final int count = Math.min(requested, available);
+        verify(count > 0);
+        return count;
+    }
+
+    private void nextChunk(final int chunkSize) {
+        if (prevChunks == null) {
+            prevChunks = new ArrayDeque<>();
+        }
+
+        prevChunks.addLast(currentChunk);
+        currentChunk = new byte[chunkSize];
+        currentOffset = 0;
+    }
+
+    private void checkClosed() {
+        checkState(result != null, "Stream has not been closed yet");
+    }
+
+    private void checkNotClosed() throws IOException {
+        if (result != null) {
+            throw new IOException("Stream is already closed");
+        }
+    }
+
+    private static int nextChunkSize(final int currentSize, final int requested) {
+        return currentSize == MAX_ARRAY_SIZE || requested >= MAX_ARRAY_SIZE
+                ? MAX_ARRAY_SIZE : Math.max(currentSize * 2, ceilingPowerOfTwo(requested));
+    }
+
+    private static int nextChunkSize(final int currentSize) {
+        return currentSize < MAX_ARRAY_SIZE ? currentSize * 2 : MAX_ARRAY_SIZE;
+    }
+
+    private static int initialCapacity(final int requestedSize) {
+        if (requestedSize < MIN_ARRAY_SIZE) {
+            return MIN_ARRAY_SIZE;
+        }
+        if (requestedSize > MAX_ARRAY_SIZE) {
+            return MAX_ARRAY_SIZE;
+        }
+        return ceilingPowerOfTwo(requestedSize);
+    }
+
+    private static byte[] trimChunk(final byte[] chunk, final int length) {
+        return chunk.length == length ? chunk : Arrays.copyOf(chunk, length);
+    }
+}
index fb66581c28f1d9d4f0b8a01c8e7ada7225357e1d..73bdd6f31baa98539d1c312a648732d4f1dd316e 100644 (file)
@@ -7,22 +7,28 @@
  */
 package org.opendaylight.controller.cluster.datastore.persisted;
 
+import static com.google.common.base.Verify.verifyNotNull;
 import static java.util.Objects.requireNonNull;
+import static org.opendaylight.controller.cluster.datastore.persisted.ChunkedOutputStream.MAX_ARRAY_SIZE;
 
 import com.google.common.annotations.Beta;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.io.ByteArrayDataOutput;
 import com.google.common.io.ByteStreams;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.Serializable;
+import java.io.StreamCorruptedException;
 import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.Map.Entry;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.yangtools.concepts.Variant;
 import org.opendaylight.yangtools.yang.data.api.schema.stream.ReusableStreamReceiver;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter;
@@ -36,61 +42,26 @@ import org.slf4j.LoggerFactory;
  * @author Robert Varga
  */
 @Beta
-public final class CommitTransactionPayload extends Payload implements Serializable {
+public abstract class CommitTransactionPayload extends Payload implements Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(CommitTransactionPayload.class);
-
-    private static final class Proxy implements Externalizable {
-        private static final long serialVersionUID = 1L;
-        private byte[] serialized;
-
-        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
-        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
-        @SuppressWarnings("checkstyle:RedundantModifier")
-        public Proxy() {
-            // For Externalizable
-        }
-
-        Proxy(final byte[] serialized) {
-            this.serialized = requireNonNull(serialized);
-        }
-
-        @Override
-        public void writeExternal(final ObjectOutput out) throws IOException {
-            out.writeInt(serialized.length);
-            out.write(serialized);
-        }
-
-        @Override
-        public void readExternal(final ObjectInput in) throws IOException {
-            final int length = in.readInt();
-            serialized = new byte[length];
-            in.readFully(serialized);
-        }
-
-        private Object readResolve() {
-            return new CommitTransactionPayload(serialized);
-        }
-    }
-
     private static final long serialVersionUID = 1L;
 
-    private final byte[] serialized;
+    CommitTransactionPayload() {
 
-    CommitTransactionPayload(final byte[] serialized) {
-        this.serialized = requireNonNull(serialized);
     }
 
     public static CommitTransactionPayload create(final TransactionIdentifier transactionId,
             final DataTreeCandidate candidate, final int initialSerializedBufferCapacity) throws IOException {
-        final ByteArrayDataOutput out = ByteStreams.newDataOutput(initialSerializedBufferCapacity);
-        transactionId.writeTo(out);
-        DataTreeCandidateInputOutput.writeDataTreeCandidate(out, candidate);
-        final byte[] serialized = out.toByteArray();
 
-        LOG.debug("Initial buffer capacity {}, actual serialized size {}",
-                initialSerializedBufferCapacity, serialized.length);
+        final ChunkedOutputStream cos = new ChunkedOutputStream(initialSerializedBufferCapacity);
+        try (DataOutputStream dos = new DataOutputStream(cos)) {
+            transactionId.writeTo(dos);
+            DataTreeCandidateInputOutput.writeDataTreeCandidate(dos, candidate);
+        }
 
-        return new CommitTransactionPayload(serialized);
+        final Variant<byte[], ChunkedByteArray> source = cos.toVariant();
+        LOG.debug("Initial buffer capacity {}, actual serialized size {}", initialSerializedBufferCapacity, cos.size());
+        return source.isFirst() ? new Simple(source.getFirst()) : new Chunked(source.getSecond());
     }
 
     @VisibleForTesting
@@ -103,19 +74,110 @@ public final class CommitTransactionPayload extends Payload implements Serializa
         return getCandidate(ReusableImmutableNormalizedNodeStreamWriter.create());
     }
 
-    public Entry<TransactionIdentifier, DataTreeCandidate> getCandidate(final ReusableStreamReceiver receiver)
-            throws IOException {
-        final DataInput in = ByteStreams.newDataInput(serialized);
+    public final Entry<TransactionIdentifier, DataTreeCandidate> getCandidate(
+            final ReusableStreamReceiver receiver) throws IOException {
+        final DataInput in = newDataInput();
         return new SimpleImmutableEntry<>(TransactionIdentifier.readFrom(in),
                 DataTreeCandidateInputOutput.readDataTreeCandidate(in, receiver));
     }
 
-    @Override
-    public int size() {
-        return serialized.length;
+    abstract void writeBytes(ObjectOutput out) throws IOException;
+
+    abstract DataInput newDataInput();
+
+    final Object writeReplace() {
+        return new Proxy(this);
     }
 
-    private Object writeReplace() {
-        return new Proxy(serialized);
+    private static final class Simple extends CommitTransactionPayload {
+        private static final long serialVersionUID = 1L;
+
+        private final byte[] serialized;
+
+        Simple(final byte[] serialized) {
+            this.serialized = requireNonNull(serialized);
+        }
+
+        @Override
+        public int size() {
+            return serialized.length;
+        }
+
+        @Override
+        DataInput newDataInput() {
+            return ByteStreams.newDataInput(serialized);
+        }
+
+        @Override
+        void writeBytes(final ObjectOutput out) throws IOException {
+            out.write(serialized);
+        }
+    }
+
+    private static final class Chunked extends CommitTransactionPayload {
+        private static final long serialVersionUID = 1L;
+
+        @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "Handled via serialization proxy")
+        private final ChunkedByteArray source;
+
+        Chunked(final ChunkedByteArray source) {
+            this.source = requireNonNull(source);
+        }
+
+        @Override
+        void writeBytes(final ObjectOutput out) throws IOException {
+            source.copyTo(out);
+        }
+
+        @Override
+        public int size() {
+            return source.size();
+        }
+
+        @Override
+        DataInput newDataInput() {
+            return new DataInputStream(source.openStream());
+        }
+    }
+
+    private static final class Proxy implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        private CommitTransactionPayload payload;
+
+        // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
+        // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public Proxy() {
+            // For Externalizable
+        }
+
+        Proxy(final CommitTransactionPayload payload) {
+            this.payload = requireNonNull(payload);
+        }
+
+        @Override
+        public void writeExternal(final ObjectOutput out) throws IOException {
+            out.writeInt(payload.size());
+            payload.writeBytes(out);
+        }
+
+        @Override
+        public void readExternal(final ObjectInput in) throws IOException {
+            final int length = in.readInt();
+            if (length < 0) {
+                throw new StreamCorruptedException("Invalid payload length " + length);
+            } else if (length < MAX_ARRAY_SIZE) {
+                final byte[] serialized = new byte[length];
+                in.readFully(serialized);
+                payload = new Simple(serialized);
+            } else {
+                payload = new Chunked(ChunkedByteArray.readFrom(in, length, MAX_ARRAY_SIZE));
+            }
+        }
+
+        private Object readResolve() {
+            return verifyNotNull(payload);
+        }
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedOutputStreamTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/ChunkedOutputStreamTest.java
new file mode 100644 (file)
index 0000000..16f7bd8
--- /dev/null
@@ -0,0 +1,96 @@
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.List;
+import org.junit.Test;
+
+public class ChunkedOutputStreamTest {
+    private static final int INITIAL_SIZE = 256;
+
+    private final ChunkedOutputStream stream = new ChunkedOutputStream(INITIAL_SIZE);
+
+    @Test
+    public void testBasicWrite() throws IOException {
+        for (int i = 0; i < INITIAL_SIZE; ++i) {
+            stream.write(i);
+        }
+
+        final byte[] chunk = assertFinishedStream(INITIAL_SIZE, 1).get(0);
+        assertEquals(INITIAL_SIZE, chunk.length);
+        for (int i = 0; i < INITIAL_SIZE; ++i) {
+            assertEquals((byte) i, chunk[i]);
+        }
+    }
+
+    @Test
+    public void testBasicLargeWrite() throws IOException {
+        final byte[] array = createArray(INITIAL_SIZE);
+        stream.write(array);
+        final byte[] chunk = assertFinishedStream(INITIAL_SIZE, 1).get(0);
+        assertArrayEquals(array, chunk);
+    }
+
+    @Test
+    public void testGrowWrite() throws IOException {
+        for (int i = 0; i < INITIAL_SIZE * 2; ++i) {
+            stream.write(i);
+        }
+
+        final byte[] chunk = assertFinishedStream(INITIAL_SIZE * 2, 1).get(0);
+        assertEquals(INITIAL_SIZE * 2, chunk.length);
+        for (int i = 0; i < INITIAL_SIZE * 2; ++i) {
+            assertEquals((byte) i, chunk[i]);
+        }
+    }
+
+    @Test
+    public void testGrowLargeWrite() throws IOException {
+        final byte[] array = createArray(INITIAL_SIZE * 2);
+        stream.write(array);
+        final byte[] chunk = assertFinishedStream(INITIAL_SIZE * 2, 1).get(0);
+        assertArrayEquals(array, chunk);
+    }
+
+    @Test
+    public void testTwoChunksWrite() throws IOException {
+        int size = ChunkedOutputStream.MAX_ARRAY_SIZE + 1;
+        for (int i = 0; i < size; ++i) {
+            stream.write(i);
+        }
+
+        int counter = 0;
+        for (byte[] chunk: assertFinishedStream(size, 2)) {
+            for (byte actual: chunk) {
+                assertEquals((byte) counter++, actual);
+            }
+        }
+    }
+
+    private List<byte[]> assertFinishedStream(final int expectedSize, final int expectedChunks) {
+        stream.close();
+        final ChunkedByteArray array = stream.toChunkedByteArray();
+        assertEquals(expectedSize, array.size());
+
+        final List<byte[]> chunks = array.getChunks();
+        assertEquals(expectedChunks, chunks.size());
+        return chunks;
+    }
+
+    private static byte[] createArray(final int size) {
+        final byte[] array = new byte[size];
+        for (int i = 0; i < size; ++i) {
+            array[i] = (byte) i;
+        }
+        return array;
+    }
+}