X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fpersisted%2FCommitTransactionPayload.java;h=3fc636b851022fcc39e65acee3365113c05b4666;hp=aaa86413bee47533188e21b0662d3ebb77d13748;hb=1d5ca4009be6c61d7b61989799037ad8f1ab7a75;hpb=3859df9beca8f13f1ff2b2744ed3470a1715bec3 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java index aaa86413be..3fc636b851 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java @@ -7,24 +7,35 @@ */ package org.opendaylight.controller.cluster.datastore.persisted; +import static com.google.common.base.Verify.verifyNotNull; +import static com.google.common.math.IntMath.ceilingPowerOfTwo; import static java.util.Objects.requireNonNull; import com.google.common.annotations.Beta; import com.google.common.annotations.VisibleForTesting; -import com.google.common.io.ByteArrayDataOutput; import com.google.common.io.ByteStreams; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.io.Serializable; +import java.io.StreamCorruptedException; import java.util.AbstractMap.SimpleImmutableEntry; import java.util.Map.Entry; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion; +import org.opendaylight.controller.cluster.io.ChunkedByteArray; +import org.opendaylight.controller.cluster.io.ChunkedOutputStream; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.IdentifiablePayload; +import org.opendaylight.yangtools.concepts.Either; +import org.opendaylight.yangtools.yang.data.api.schema.stream.ReusableStreamReceiver; import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,86 +46,183 @@ import org.slf4j.LoggerFactory; * @author Robert Varga */ @Beta -public final class CommitTransactionPayload extends Payload implements Serializable { +public abstract class CommitTransactionPayload extends IdentifiablePayload + implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(CommitTransactionPayload.class); + private static final long serialVersionUID = 1L; - private static final class Proxy implements Externalizable { - private static final long serialVersionUID = 1L; - private byte[] serialized; + private static final int MAX_ARRAY_SIZE = ceilingPowerOfTwo(Integer.getInteger( + "org.opendaylight.controller.cluster.datastore.persisted.max-array-size", 256 * 1024)); - // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't - // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. - @SuppressWarnings("checkstyle:RedundantModifier") - public Proxy() { - // For Externalizable + private volatile Entry candidate = null; + + CommitTransactionPayload() { + + } + + public static @NonNull CommitTransactionPayload create(final TransactionIdentifier transactionId, + final DataTreeCandidate candidate, final PayloadVersion version, final int initialSerializedBufferCapacity) + throws IOException { + final ChunkedOutputStream cos = new ChunkedOutputStream(initialSerializedBufferCapacity, MAX_ARRAY_SIZE); + try (DataOutputStream dos = new DataOutputStream(cos)) { + transactionId.writeTo(dos); + DataTreeCandidateInputOutput.writeDataTreeCandidate(dos, version, candidate); } - Proxy(final byte[] serialized) { + final Either source = cos.toVariant(); + LOG.debug("Initial buffer capacity {}, actual serialized size {}", initialSerializedBufferCapacity, cos.size()); + return source.isFirst() ? new Simple(source.getFirst()) : new Chunked(source.getSecond()); + } + + @VisibleForTesting + public static @NonNull CommitTransactionPayload create(final TransactionIdentifier transactionId, + final DataTreeCandidate candidate, final PayloadVersion version) throws IOException { + return create(transactionId, candidate, version, 512); + } + + @VisibleForTesting + public static @NonNull CommitTransactionPayload create(final TransactionIdentifier transactionId, + final DataTreeCandidate candidate) throws IOException { + return create(transactionId, candidate, PayloadVersion.current()); + } + + public @NonNull Entry getCandidate() throws IOException { + Entry localCandidate = candidate; + if (localCandidate == null) { + synchronized (this) { + localCandidate = candidate; + if (localCandidate == null) { + candidate = localCandidate = getCandidate(ReusableImmutableNormalizedNodeStreamWriter.create()); + } + } + } + return localCandidate; + } + + public final @NonNull Entry getCandidate( + final ReusableStreamReceiver receiver) throws IOException { + final DataInput in = newDataInput(); + return new SimpleImmutableEntry<>(TransactionIdentifier.readFrom(in), + DataTreeCandidateInputOutput.readDataTreeCandidate(in, receiver)); + } + + @Override + public TransactionIdentifier getIdentifier() { + try { + return getCandidate().getKey(); + } catch (IOException e) { + throw new IllegalStateException("Candidate deserialization failed.", e); + } + } + + /** + * The cached candidate needs to be cleared after it is done applying to the DataTree, otherwise it would be keeping + * deserialized in memory which are not needed anymore leading to wasted memory. This lets the payload know that + * this was the last time the candidate was needed ant it is safe to be cleared. + */ + public Entry acquireCandidate() throws IOException { + final Entry localCandidate = getCandidate(); + candidate = null; + return localCandidate; + } + + abstract void writeBytes(ObjectOutput out) throws IOException; + + abstract DataInput newDataInput(); + + final Object writeReplace() { + return new Proxy(this); + } + + private static final class Simple extends CommitTransactionPayload { + private static final long serialVersionUID = 1L; + + private final byte[] serialized; + + Simple(final byte[] serialized) { this.serialized = requireNonNull(serialized); } @Override - public void writeExternal(final ObjectOutput out) throws IOException { - out.writeInt(serialized.length); - out.write(serialized); + public int size() { + return serialized.length; } @Override - public void readExternal(final ObjectInput in) throws IOException { - final int length = in.readInt(); - serialized = new byte[length]; - in.readFully(serialized); + DataInput newDataInput() { + return ByteStreams.newDataInput(serialized); } - private Object readResolve() { - return new CommitTransactionPayload(serialized); + @Override + void writeBytes(final ObjectOutput out) throws IOException { + out.write(serialized); } } - private static final long serialVersionUID = 1L; + private static final class Chunked extends CommitTransactionPayload { + private static final long serialVersionUID = 1L; - private final byte[] serialized; + @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "Handled via serialization proxy") + private final ChunkedByteArray source; - CommitTransactionPayload(final byte[] serialized) { - this.serialized = requireNonNull(serialized); - } + Chunked(final ChunkedByteArray source) { + this.source = requireNonNull(source); + } - public static CommitTransactionPayload create(final TransactionIdentifier transactionId, - final DataTreeCandidate candidate, final int initialSerializedBufferCapacity) throws IOException { - final ByteArrayDataOutput out = ByteStreams.newDataOutput(initialSerializedBufferCapacity); - transactionId.writeTo(out); - DataTreeCandidateInputOutput.writeDataTreeCandidate(out, candidate); - final byte[] serialized = out.toByteArray(); + @Override + void writeBytes(final ObjectOutput out) throws IOException { + source.copyTo(out); + } - LOG.debug("Initial buffer capacity {}, actual serialized size {}", - initialSerializedBufferCapacity, serialized.length); + @Override + public int size() { + return source.size(); + } - return new CommitTransactionPayload(serialized); + @Override + DataInput newDataInput() { + return new DataInputStream(source.openStream()); + } } - @VisibleForTesting - public static CommitTransactionPayload create(final TransactionIdentifier transactionId, - final DataTreeCandidate candidate) throws IOException { - return create(transactionId, candidate, 512); - } + private static final class Proxy implements Externalizable { + private static final long serialVersionUID = 1L; - public Entry getCandidate() throws IOException { - return getCandidate(ReusableImmutableNormalizedNodeStreamWriter.create()); - } + private CommitTransactionPayload payload; - public Entry getCandidate( - final ReusableImmutableNormalizedNodeStreamWriter writer) throws IOException { - final DataInput in = ByteStreams.newDataInput(serialized); - return new SimpleImmutableEntry<>(TransactionIdentifier.readFrom(in), - DataTreeCandidateInputOutput.readDataTreeCandidate(in, writer)); - } + // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't + // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") + public Proxy() { + // For Externalizable + } - @Override - public int size() { - return serialized.length; - } + Proxy(final CommitTransactionPayload payload) { + this.payload = requireNonNull(payload); + } - private Object writeReplace() { - return new Proxy(serialized); + @Override + public void writeExternal(final ObjectOutput out) throws IOException { + out.writeInt(payload.size()); + payload.writeBytes(out); + } + + @Override + public void readExternal(final ObjectInput in) throws IOException { + final int length = in.readInt(); + if (length < 0) { + throw new StreamCorruptedException("Invalid payload length " + length); + } else if (length < MAX_ARRAY_SIZE) { + final byte[] serialized = new byte[length]; + in.readFully(serialized); + payload = new Simple(serialized); + } else { + payload = new Chunked(ChunkedByteArray.readFrom(in, length, MAX_ARRAY_SIZE)); + } + } + + private Object readResolve() { + return verifyNotNull(payload); + } } }