* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.cluster.datastore.persisted;
+package org.opendaylight.controller.cluster.io;
import static java.util.Objects.requireNonNull;
+import com.google.common.annotations.Beta;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
+import com.google.common.io.ByteSink;
import java.io.DataOutput;
import java.io.IOException;
+import java.io.InputStream;
import java.io.ObjectInput;
import java.util.ArrayList;
import java.util.List;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.opendaylight.yangtools.concepts.Immutable;
+@Beta
@NonNullByDefault
-final class ChunkedByteArray implements Immutable {
+public final class ChunkedByteArray implements Immutable {
private final ImmutableList<byte[]> chunks;
private final int size;
this.chunks = requireNonNull(chunks);
}
- static ChunkedByteArray readFrom(final ObjectInput in, final int size, final int chunkSize)
+ public static ChunkedByteArray readFrom(final ObjectInput in, final int size, final int chunkSize)
throws IOException {
final List<byte[]> chunks = new ArrayList<>(requiredChunks(size, chunkSize));
int remaining = size;
return new ChunkedByteArray(size, ImmutableList.copyOf(chunks));
}
- int size() {
+ public int size() {
return size;
}
- ChunkedInputStream openStream() {
+ public InputStream openStream() {
return new ChunkedInputStream(size, chunks.iterator());
}
- void copyTo(final DataOutput output) throws IOException {
+ public void copyTo(final DataOutput output) throws IOException {
for (byte[] chunk : chunks) {
output.write(chunk, 0, chunk.length);
}
}
+ public void copyTo(final ByteSink output) throws IOException {
+ for (byte[] chunk : chunks) {
+ output.write(chunk);
+ }
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("size", size).add("chunkCount", chunks.size()).toString();
}
+ @VisibleForTesting
ImmutableList<byte[]> getChunks() {
return chunks;
}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.cluster.datastore.persisted;
+package org.opendaylight.controller.cluster.io;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.math.IntMath.ceilingPowerOfTwo;
+import static com.google.common.math.IntMath.isPowerOfTwo;
+import com.google.common.annotations.Beta;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableList.Builder;
import java.io.ByteArrayOutputStream;
* <ul>
* <li>Data acquisition, during which we start with an initial (power-of-two) size and proceed to fill it up. Once the
* buffer is full, we stash it, allocate a new buffer twice its size and repeat the process. Once we hit
- * {@link #MAX_ARRAY_SIZE}, we do not grow subsequent buffer. We also can skip some intermediate sizes if data
+ * {@code maxChunkSize}, we do not grow subsequent buffer. We also can skip some intermediate sizes if data
* is introduced in large chunks via {@link #write(byte[], int, int)}.</li>
* <li>Buffer consolidation, which occurs when the stream is {@link #close() closed}. At this point we construct the
* final collection of buffers.</li>
* <p>
* The data acquisition strategy results in predictably-sized buffers, which are growing exponentially in size until
* they hit maximum size. Intrinsic property here is that the total capacity of chunks created during the ramp up is
- * guaranteed to fit into {@code MAX_ARRAY_SIZE}, hence they can readily be compacted into a single buffer, which
- * replaces them. Combined with the requirement to trim the last buffer to have accurate length, this algorithm
- * guarantees total number of internal copy operations is capped at {@code 2 * MAX_ARRAY_SIZE}. The number of produced
- * chunks is also well-controlled:
+ * guaranteed to fit into {@code maxChunkSize}, hence they can readily be compacted into a single buffer, which replaces
+ * them. Combined with the requirement to trim the last buffer to have accurate length, this algorithm guarantees total
+ * number of internal copy operations is capped at {@code 2 * maxChunkSize}. The number of produced chunks is also
+ * well-controlled:
* <ul>
* <li>for slowly-built data, we will maintain perfect packing</li>
* <li>for fast-startup data, we will be at most one one chunk away from packing perfectly</li>
* @author Robert Varga
* @author Tomas Olvecky
*/
-final class ChunkedOutputStream extends OutputStream {
- static final int MAX_ARRAY_SIZE = ceilingPowerOfTwo(Integer.getInteger(
- "org.opendaylight.controller.cluster.datastore.persisted.max-array-size", 256 * 1024));
+@Beta
+public final class ChunkedOutputStream extends OutputStream {
private static final int MIN_ARRAY_SIZE = 32;
+ private final int maxChunkSize;
+
// byte[] or a List
private Object result;
// Lazily-allocated to reduce pressure for single-chunk streams
private int currentOffset;
private int size;
- ChunkedOutputStream(final int requestedInitialCapacity) {
- currentChunk = new byte[initialCapacity(requestedInitialCapacity)];
+ public ChunkedOutputStream(final int requestedInitialCapacity, final int maxChunkSize) {
+ checkArgument(isPowerOfTwo(maxChunkSize), "Maximum chunk size %s is not a power of two", maxChunkSize);
+ checkArgument(maxChunkSize > 0, "Maximum chunk size %s is not positive", maxChunkSize);
+ this.maxChunkSize = maxChunkSize;
+ currentChunk = new byte[initialCapacity(requestedInitialCapacity, maxChunkSize)];
}
@Override
}
}
- int size() {
+ public int size() {
return size;
}
- ChunkedByteArray toChunkedByteArray() {
+ public Variant<byte[], ChunkedByteArray> toVariant() {
checkClosed();
- return new ChunkedByteArray(size, result instanceof byte[] ? ImmutableList.of((byte[]) result)
- : (ImmutableList<byte[]>) result);
+ return result instanceof byte[] ? Variant.ofFirst((byte[]) result)
+ : Variant.ofSecond(new ChunkedByteArray(size, (ImmutableList<byte[]>) result));
}
- Variant<byte[], ChunkedByteArray> toVariant() {
+ @VisibleForTesting
+ ChunkedByteArray toChunkedByteArray() {
checkClosed();
- return result instanceof byte[] ? Variant.ofFirst((byte[]) result)
- : Variant.ofSecond(new ChunkedByteArray(size, (ImmutableList<byte[]>) result));
+ return new ChunkedByteArray(size, result instanceof byte[] ? ImmutableList.of((byte[]) result)
+ : (ImmutableList<byte[]>) result);
}
private Object computeResult() {
// Simple case: it's only the current buffer, return that
return trimChunk(currentChunk, currentOffset);
}
- if (size <= MAX_ARRAY_SIZE) {
+ if (size <= maxChunkSize) {
// We have collected less than full chunk of data, let's have just one chunk ...
final byte[] singleChunk;
if (currentOffset == 0 && prevChunks.size() == 1) {
final Iterator<byte[]> it = prevChunks.iterator();
do {
final byte[] chunk = it.next();
- if (chunk.length == MAX_ARRAY_SIZE) {
+ if (chunk.length == maxChunkSize) {
break;
}
}
}
- private static int nextChunkSize(final int currentSize, final int requested) {
- return currentSize == MAX_ARRAY_SIZE || requested >= MAX_ARRAY_SIZE
- ? MAX_ARRAY_SIZE : Math.max(currentSize * 2, ceilingPowerOfTwo(requested));
+ private int nextChunkSize(final int currentSize, final int requested) {
+ return currentSize == maxChunkSize || requested >= maxChunkSize
+ ? maxChunkSize : Math.max(currentSize * 2, ceilingPowerOfTwo(requested));
}
- private static int nextChunkSize(final int currentSize) {
- return currentSize < MAX_ARRAY_SIZE ? currentSize * 2 : MAX_ARRAY_SIZE;
+ private int nextChunkSize(final int currentSize) {
+ return currentSize < maxChunkSize ? currentSize * 2 : maxChunkSize;
}
- private static int initialCapacity(final int requestedSize) {
+ private static int initialCapacity(final int requestedSize, final int maxChunkSize) {
if (requestedSize < MIN_ARRAY_SIZE) {
return MIN_ARRAY_SIZE;
}
- if (requestedSize > MAX_ARRAY_SIZE) {
- return MAX_ARRAY_SIZE;
+ if (requestedSize > maxChunkSize) {
+ return maxChunkSize;
}
return ceilingPowerOfTwo(requestedSize);
}
package org.opendaylight.controller.cluster.datastore.persisted;
import static com.google.common.base.Verify.verifyNotNull;
+import static com.google.common.math.IntMath.ceilingPowerOfTwo;
import static java.util.Objects.requireNonNull;
-import static org.opendaylight.controller.cluster.datastore.persisted.ChunkedOutputStream.MAX_ARRAY_SIZE;
import com.google.common.annotations.Beta;
import com.google.common.annotations.VisibleForTesting;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion;
+import org.opendaylight.controller.cluster.io.ChunkedByteArray;
+import org.opendaylight.controller.cluster.io.ChunkedOutputStream;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.IdentifiablePayload;
import org.opendaylight.yangtools.concepts.Variant;
import org.opendaylight.yangtools.yang.data.api.schema.stream.ReusableStreamReceiver;
private static final Logger LOG = LoggerFactory.getLogger(CommitTransactionPayload.class);
private static final long serialVersionUID = 1L;
+ private static final int MAX_ARRAY_SIZE = ceilingPowerOfTwo(Integer.getInteger(
+ "org.opendaylight.controller.cluster.datastore.persisted.max-array-size", 256 * 1024));
+
private volatile Entry<TransactionIdentifier, DataTreeCandidateWithVersion> candidate = null;
CommitTransactionPayload() {
public static @NonNull CommitTransactionPayload create(final TransactionIdentifier transactionId,
final DataTreeCandidate candidate, final PayloadVersion version, final int initialSerializedBufferCapacity)
throws IOException {
- final ChunkedOutputStream cos = new ChunkedOutputStream(initialSerializedBufferCapacity);
+ final ChunkedOutputStream cos = new ChunkedOutputStream(initialSerializedBufferCapacity, MAX_ARRAY_SIZE);
try (DataOutputStream dos = new DataOutputStream(cos)) {
transactionId.writeTo(dos);
DataTreeCandidateInputOutput.writeDataTreeCandidate(dos, version, candidate);