X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fpersisted%2FCommitTransactionPayload.java;h=5337530ece249128e078b7a88fa4a0b071c7cb1b;hb=a66474c41883733413a6851d49fb5ade892764b3;hp=213d61b8dda72ad24b6eb3577d43dad0d0d859ab;hpb=12fcdfe39aa26dcba7fd3bb4d4c68e3d02e65c51;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java index 213d61b8dd..5337530ece 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java @@ -7,21 +7,37 @@ */ package org.opendaylight.controller.cluster.datastore.persisted; +import static com.google.common.base.Verify.verifyNotNull; +import static com.google.common.math.IntMath.ceilingPowerOfTwo; +import static java.util.Objects.requireNonNull; + import com.google.common.annotations.Beta; -import com.google.common.base.Preconditions; -import com.google.common.io.ByteArrayDataOutput; +import com.google.common.annotations.VisibleForTesting; import com.google.common.io.ByteStreams; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.io.Serializable; +import java.io.StreamCorruptedException; import java.util.AbstractMap.SimpleImmutableEntry; import java.util.Map.Entry; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion; +import org.opendaylight.controller.cluster.io.ChunkedByteArray; +import org.opendaylight.controller.cluster.io.ChunkedOutputStream; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.IdentifiablePayload; +import org.opendaylight.yangtools.concepts.Either; +import org.opendaylight.yangtools.yang.data.api.schema.stream.ReusableStreamReceiver; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Payload persisted when a transaction commits. It contains the transaction identifier and the @@ -30,10 +46,149 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; * @author Robert Varga */ @Beta -public final class CommitTransactionPayload extends Payload implements Serializable { +public abstract class CommitTransactionPayload extends IdentifiablePayload + implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(CommitTransactionPayload.class); + private static final long serialVersionUID = 1L; + + private static final int MAX_ARRAY_SIZE = ceilingPowerOfTwo(Integer.getInteger( + "org.opendaylight.controller.cluster.datastore.persisted.max-array-size", 256 * 1024)); + + private volatile Entry candidate = null; + + CommitTransactionPayload() { + + } + + public static @NonNull CommitTransactionPayload create(final TransactionIdentifier transactionId, + final DataTreeCandidate candidate, final PayloadVersion version, final int initialSerializedBufferCapacity) + throws IOException { + final ChunkedOutputStream cos = new ChunkedOutputStream(initialSerializedBufferCapacity, MAX_ARRAY_SIZE); + try (DataOutputStream dos = new DataOutputStream(cos)) { + transactionId.writeTo(dos); + DataTreeCandidateInputOutput.writeDataTreeCandidate(dos, version, candidate); + } + + final Either source = cos.toVariant(); + LOG.debug("Initial buffer capacity {}, actual serialized size {}", initialSerializedBufferCapacity, cos.size()); + return source.isFirst() ? new Simple(source.getFirst()) : new Chunked(source.getSecond()); + } + + @VisibleForTesting + public static @NonNull CommitTransactionPayload create(final TransactionIdentifier transactionId, + final DataTreeCandidate candidate, final PayloadVersion version) throws IOException { + return create(transactionId, candidate, version, 512); + } + + @VisibleForTesting + public static @NonNull CommitTransactionPayload create(final TransactionIdentifier transactionId, + final DataTreeCandidate candidate) throws IOException { + return create(transactionId, candidate, PayloadVersion.current()); + } + + public @NonNull Entry getCandidate() throws IOException { + Entry localCandidate = candidate; + if (localCandidate == null) { + synchronized (this) { + localCandidate = candidate; + if (localCandidate == null) { + candidate = localCandidate = getCandidate(ReusableImmutableNormalizedNodeStreamWriter.create()); + } + } + } + return localCandidate; + } + + public final @NonNull Entry getCandidate( + final ReusableStreamReceiver receiver) throws IOException { + final DataInput in = newDataInput(); + return new SimpleImmutableEntry<>(TransactionIdentifier.readFrom(in), + DataTreeCandidateInputOutput.readDataTreeCandidate(in, receiver)); + } + + @Override + public TransactionIdentifier getIdentifier() { + try { + return getCandidate().getKey(); + } catch (IOException e) { + throw new IllegalStateException("Candidate deserialization failed.", e); + } + } + + /** + * The cached candidate needs to be cleared after it is done applying to the DataTree, otherwise it would be keeping + * deserialized in memory which are not needed anymore leading to wasted memory. This lets the payload know that + * this was the last time the candidate was needed ant it is safe to be cleared. + */ + public Entry acquireCandidate() throws IOException { + final Entry localCandidate = getCandidate(); + candidate = null; + return localCandidate; + } + + abstract void writeBytes(ObjectOutput out) throws IOException; + + abstract DataInput newDataInput(); + + final Object writeReplace() { + return new Proxy(this); + } + + private static final class Simple extends CommitTransactionPayload { + private static final long serialVersionUID = 1L; + + private final byte[] serialized; + + Simple(final byte[] serialized) { + this.serialized = requireNonNull(serialized); + } + + @Override + public int size() { + return serialized.length; + } + + @Override + DataInput newDataInput() { + return ByteStreams.newDataInput(serialized); + } + + @Override + void writeBytes(final ObjectOutput out) throws IOException { + out.write(serialized); + } + } + + private static final class Chunked extends CommitTransactionPayload { + private static final long serialVersionUID = 1L; + + @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "Handled via serialization proxy") + private final ChunkedByteArray source; + + Chunked(final ChunkedByteArray source) { + this.source = requireNonNull(source); + } + + @Override + void writeBytes(final ObjectOutput out) throws IOException { + source.copyTo(out); + } + + @Override + public int size() { + return source.size(); + } + + @Override + DataInput newDataInput() { + return new DataInputStream(source.openStream()); + } + } + private static final class Proxy implements Externalizable { private static final long serialVersionUID = 1L; - private byte[] serialized; + + private CommitTransactionPayload payload; // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. @@ -42,56 +197,32 @@ public final class CommitTransactionPayload extends Payload implements Serializa // For Externalizable } - Proxy(final byte[] serialized) { - this.serialized = Preconditions.checkNotNull(serialized); + Proxy(final CommitTransactionPayload payload) { + this.payload = requireNonNull(payload); } @Override public void writeExternal(final ObjectOutput out) throws IOException { - out.writeInt(serialized.length); - out.write(serialized); + out.writeInt(payload.size()); + payload.writeBytes(out); } @Override public void readExternal(final ObjectInput in) throws IOException { final int length = in.readInt(); - serialized = new byte[length]; - in.readFully(serialized); + if (length < 0) { + throw new StreamCorruptedException("Invalid payload length " + length); + } else if (length < MAX_ARRAY_SIZE) { + final byte[] serialized = new byte[length]; + in.readFully(serialized); + payload = new Simple(serialized); + } else { + payload = new Chunked(ChunkedByteArray.readFrom(in, length, MAX_ARRAY_SIZE)); + } } private Object readResolve() { - return new CommitTransactionPayload(serialized); + return verifyNotNull(payload); } } - - private static final long serialVersionUID = 1L; - - private final byte[] serialized; - - CommitTransactionPayload(final byte[] serialized) { - this.serialized = Preconditions.checkNotNull(serialized); - } - - public static CommitTransactionPayload create(final TransactionIdentifier transactionId, - final DataTreeCandidate candidate) throws IOException { - final ByteArrayDataOutput out = ByteStreams.newDataOutput(); - transactionId.writeTo(out); - DataTreeCandidateInputOutput.writeDataTreeCandidate(out, candidate); - return new CommitTransactionPayload(out.toByteArray()); - } - - public Entry getCandidate() throws IOException { - final DataInput in = ByteStreams.newDataInput(serialized); - return new SimpleImmutableEntry<>(TransactionIdentifier.readFrom(in), - DataTreeCandidateInputOutput.readDataTreeCandidate(in)); - } - - @Override - public int size() { - return serialized.length; - } - - private Object writeReplace() { - return new Proxy(serialized); - } }