X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fpersisted%2FCommitTransactionPayload.java;h=fed348320defd0494318956c5f2e9d847e773af9;hp=f4ac854a2c6e1ac6d98d6bed7326429a4c7bcb14;hb=HEAD;hpb=e66759266dc43d5f58b2837aca5047b42c205e4a diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java index f4ac854a2c..45cbcc851a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java @@ -7,33 +7,31 @@ */ package org.opendaylight.controller.cluster.datastore.persisted; -import static com.google.common.base.Verify.verifyNotNull; +import static com.google.common.math.IntMath.ceilingPowerOfTwo; import static java.util.Objects.requireNonNull; -import static org.opendaylight.controller.cluster.datastore.persisted.ChunkedOutputStream.MAX_ARRAY_SIZE; import com.google.common.annotations.Beta; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.MoreObjects; import com.google.common.io.ByteStreams; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.DataInput; import java.io.DataInputStream; import java.io.DataOutputStream; -import java.io.Externalizable; import java.io.IOException; -import java.io.ObjectInput; import java.io.ObjectOutput; import java.io.Serializable; -import java.io.StreamCorruptedException; -import java.util.AbstractMap.SimpleImmutableEntry; -import java.util.Map.Entry; +import org.apache.commons.lang3.SerializationUtils; import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.NonNullByDefault; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion; -import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; -import org.opendaylight.yangtools.concepts.Variant; +import org.opendaylight.controller.cluster.io.ChunkedByteArray; +import org.opendaylight.controller.cluster.io.ChunkedOutputStream; +import org.opendaylight.controller.cluster.raft.messages.IdentifiablePayload; import org.opendaylight.yangtools.yang.data.api.schema.stream.ReusableStreamReceiver; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion; import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,24 +42,42 @@ import org.slf4j.LoggerFactory; * @author Robert Varga */ @Beta -public abstract class CommitTransactionPayload extends Payload implements Serializable { +public abstract sealed class CommitTransactionPayload extends IdentifiablePayload + implements Serializable { + @NonNullByDefault + public record CandidateTransaction( + TransactionIdentifier transactionId, + DataTreeCandidate candidate, + NormalizedNodeStreamVersion streamVersion) { + public CandidateTransaction { + requireNonNull(transactionId); + requireNonNull(candidate); + requireNonNull(streamVersion); + } + } + private static final Logger LOG = LoggerFactory.getLogger(CommitTransactionPayload.class); private static final long serialVersionUID = 1L; - CommitTransactionPayload() { + static final int MAX_ARRAY_SIZE = ceilingPowerOfTwo(Integer.getInteger( + "org.opendaylight.controller.cluster.datastore.persisted.max-array-size", 256 * 1024)); + + private volatile CandidateTransaction candidate = null; + private CommitTransactionPayload() { + // hidden on purpose } public static @NonNull CommitTransactionPayload create(final TransactionIdentifier transactionId, final DataTreeCandidate candidate, final PayloadVersion version, final int initialSerializedBufferCapacity) throws IOException { - final ChunkedOutputStream cos = new ChunkedOutputStream(initialSerializedBufferCapacity); - try (DataOutputStream dos = new DataOutputStream(cos)) { + final var cos = new ChunkedOutputStream(initialSerializedBufferCapacity, MAX_ARRAY_SIZE); + try (var dos = new DataOutputStream(cos)) { transactionId.writeTo(dos); DataTreeCandidateInputOutput.writeDataTreeCandidate(dos, version, candidate); } - final Variant source = cos.toVariant(); + final var source = cos.toVariant(); LOG.debug("Initial buffer capacity {}, actual serialized size {}", initialSerializedBufferCapacity, cos.size()); return source.isFirst() ? new Simple(source.getFirst()) : new Chunked(source.getSecond()); } @@ -78,26 +94,74 @@ public abstract class CommitTransactionPayload extends Payload implements Serial return create(transactionId, candidate, PayloadVersion.current()); } - public @NonNull Entry getCandidate() throws IOException { - return getCandidate(ReusableImmutableNormalizedNodeStreamWriter.create()); + public @NonNull CandidateTransaction getCandidate() throws IOException { + var localCandidate = candidate; + if (localCandidate == null) { + synchronized (this) { + localCandidate = candidate; + if (localCandidate == null) { + candidate = localCandidate = getCandidate(ReusableImmutableNormalizedNodeStreamWriter.create()); + } + } + } + return localCandidate; + } + + public final @NonNull CandidateTransaction getCandidate(final ReusableStreamReceiver receiver) throws IOException { + final var in = newDataInput(); + final var transactionId = TransactionIdentifier.readFrom(in); + final var readCandidate = DataTreeCandidateInputOutput.readDataTreeCandidate(in, receiver); + + return new CandidateTransaction(transactionId, readCandidate.candidate(), readCandidate.version()); + } + + @Override + public TransactionIdentifier getIdentifier() { + try { + return getCandidate().transactionId(); + } catch (IOException e) { + throw new IllegalStateException("Candidate deserialization failed.", e); + } + } + + @Override + public final int serializedSize() { + // TODO: this is not entirely accurate as the the byte[] can be chunked by the serialization stream + return ProxySizeHolder.PROXY_SIZE + size(); + } + + /** + * The cached candidate needs to be cleared after it is done applying to the DataTree, otherwise it would be keeping + * deserialized in memory which are not needed anymore leading to wasted memory. This lets the payload know that + * this was the last time the candidate was needed ant it is safe to be cleared. + */ + public @NonNull CandidateTransaction acquireCandidate() throws IOException { + final var localCandidate = getCandidate(); + candidate = null; + return localCandidate; } - public final @NonNull Entry getCandidate( - final ReusableStreamReceiver receiver) throws IOException { - final DataInput in = newDataInput(); - return new SimpleImmutableEntry<>(TransactionIdentifier.readFrom(in), - DataTreeCandidateInputOutput.readDataTreeCandidate(in, receiver)); + @Override + public final String toString() { + final var helper = MoreObjects.toStringHelper(this); + final var localCandidate = candidate; + if (localCandidate != null) { + helper.add("identifier", candidate.transactionId()); + } + return helper.add("size", size()).toString(); } abstract void writeBytes(ObjectOutput out) throws IOException; abstract DataInput newDataInput(); - final Object writeReplace() { - return new Proxy(this); + @Override + public final Object writeReplace() { + return new CT(this); } - private static final class Simple extends CommitTransactionPayload { + static final class Simple extends CommitTransactionPayload { + @java.io.Serial private static final long serialVersionUID = 1L; private final byte[] serialized; @@ -122,7 +186,8 @@ public abstract class CommitTransactionPayload extends Payload implements Serial } } - private static final class Chunked extends CommitTransactionPayload { + static final class Chunked extends CommitTransactionPayload { + @java.io.Serial private static final long serialVersionUID = 1L; @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "Handled via serialization proxy") @@ -148,44 +213,12 @@ public abstract class CommitTransactionPayload extends Payload implements Serial } } - private static final class Proxy implements Externalizable { - private static final long serialVersionUID = 1L; - - private CommitTransactionPayload payload; - - // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't - // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. - @SuppressWarnings("checkstyle:RedundantModifier") - public Proxy() { - // For Externalizable - } - - Proxy(final CommitTransactionPayload payload) { - this.payload = requireNonNull(payload); - } - - @Override - public void writeExternal(final ObjectOutput out) throws IOException { - out.writeInt(payload.size()); - payload.writeBytes(out); - } - - @Override - public void readExternal(final ObjectInput in) throws IOException { - final int length = in.readInt(); - if (length < 0) { - throw new StreamCorruptedException("Invalid payload length " + length); - } else if (length < MAX_ARRAY_SIZE) { - final byte[] serialized = new byte[length]; - in.readFully(serialized); - payload = new Simple(serialized); - } else { - payload = new Chunked(ChunkedByteArray.readFrom(in, length, MAX_ARRAY_SIZE)); - } - } + // Exists to break initialization dependency between CommitTransactionPayload/Simple/Proxy + private static final class ProxySizeHolder { + static final int PROXY_SIZE = SerializationUtils.serialize(new CT(new Simple(new byte[0]))).length; - private Object readResolve() { - return verifyNotNull(payload); + private ProxySizeHolder() { + // Hidden on purpose } } }