X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fpersisted%2FCommitTransactionPayload.java;h=fed348320defd0494318956c5f2e9d847e773af9;hp=4cbe151a6d8337f6f607ce26e3f6d087d2912148;hb=HEAD;hpb=0f88bd70a92ec2b536b0633a6ba4e6733cee475d diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java index 4cbe151a6d..45cbcc851a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java @@ -7,7 +7,6 @@ */ package org.opendaylight.controller.cluster.datastore.persisted; -import static com.google.common.base.Verify.verifyNotNull; import static com.google.common.math.IntMath.ceilingPowerOfTwo; import static java.util.Objects.requireNonNull; @@ -19,24 +18,18 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.DataInput; import java.io.DataInputStream; import java.io.DataOutputStream; -import java.io.Externalizable; import java.io.IOException; -import java.io.ObjectInput; import java.io.ObjectOutput; import java.io.Serializable; -import java.io.StreamCorruptedException; -import java.util.AbstractMap.SimpleImmutableEntry; -import java.util.Map.Entry; import org.apache.commons.lang3.SerializationUtils; import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.NonNullByDefault; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion; import org.opendaylight.controller.cluster.io.ChunkedByteArray; import org.opendaylight.controller.cluster.io.ChunkedOutputStream; import org.opendaylight.controller.cluster.raft.messages.IdentifiablePayload; -import org.opendaylight.controller.cluster.raft.persisted.LegacySerializable; -import org.opendaylight.yangtools.concepts.Either; import org.opendaylight.yangtools.yang.data.api.schema.stream.ReusableStreamReceiver; +import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion; import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter; import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate; import org.slf4j.Logger; @@ -51,28 +44,40 @@ import org.slf4j.LoggerFactory; @Beta public abstract sealed class CommitTransactionPayload extends IdentifiablePayload implements Serializable { + @NonNullByDefault + public record CandidateTransaction( + TransactionIdentifier transactionId, + DataTreeCandidate candidate, + NormalizedNodeStreamVersion streamVersion) { + public CandidateTransaction { + requireNonNull(transactionId); + requireNonNull(candidate); + requireNonNull(streamVersion); + } + } + private static final Logger LOG = LoggerFactory.getLogger(CommitTransactionPayload.class); private static final long serialVersionUID = 1L; static final int MAX_ARRAY_SIZE = ceilingPowerOfTwo(Integer.getInteger( "org.opendaylight.controller.cluster.datastore.persisted.max-array-size", 256 * 1024)); - private volatile Entry candidate = null; + private volatile CandidateTransaction candidate = null; - CommitTransactionPayload() { + private CommitTransactionPayload() { // hidden on purpose } public static @NonNull CommitTransactionPayload create(final TransactionIdentifier transactionId, final DataTreeCandidate candidate, final PayloadVersion version, final int initialSerializedBufferCapacity) throws IOException { - final ChunkedOutputStream cos = new ChunkedOutputStream(initialSerializedBufferCapacity, MAX_ARRAY_SIZE); - try (DataOutputStream dos = new DataOutputStream(cos)) { + final var cos = new ChunkedOutputStream(initialSerializedBufferCapacity, MAX_ARRAY_SIZE); + try (var dos = new DataOutputStream(cos)) { transactionId.writeTo(dos); DataTreeCandidateInputOutput.writeDataTreeCandidate(dos, version, candidate); } - final Either source = cos.toVariant(); + final var source = cos.toVariant(); LOG.debug("Initial buffer capacity {}, actual serialized size {}", initialSerializedBufferCapacity, cos.size()); return source.isFirst() ? new Simple(source.getFirst()) : new Chunked(source.getSecond()); } @@ -89,8 +94,8 @@ public abstract sealed class CommitTransactionPayload extends IdentifiablePayloa return create(transactionId, candidate, PayloadVersion.current()); } - public @NonNull Entry getCandidate() throws IOException { - Entry localCandidate = candidate; + public @NonNull CandidateTransaction getCandidate() throws IOException { + var localCandidate = candidate; if (localCandidate == null) { synchronized (this) { localCandidate = candidate; @@ -102,17 +107,18 @@ public abstract sealed class CommitTransactionPayload extends IdentifiablePayloa return localCandidate; } - public final @NonNull Entry getCandidate( - final ReusableStreamReceiver receiver) throws IOException { - final DataInput in = newDataInput(); - return new SimpleImmutableEntry<>(TransactionIdentifier.readFrom(in), - DataTreeCandidateInputOutput.readDataTreeCandidate(in, receiver)); + public final @NonNull CandidateTransaction getCandidate(final ReusableStreamReceiver receiver) throws IOException { + final var in = newDataInput(); + final var transactionId = TransactionIdentifier.readFrom(in); + final var readCandidate = DataTreeCandidateInputOutput.readDataTreeCandidate(in, receiver); + + return new CandidateTransaction(transactionId, readCandidate.candidate(), readCandidate.version()); } @Override public TransactionIdentifier getIdentifier() { try { - return getCandidate().getKey(); + return getCandidate().transactionId(); } catch (IOException e) { throw new IllegalStateException("Candidate deserialization failed.", e); } @@ -129,8 +135,8 @@ public abstract sealed class CommitTransactionPayload extends IdentifiablePayloa * deserialized in memory which are not needed anymore leading to wasted memory. This lets the payload know that * this was the last time the candidate was needed ant it is safe to be cleared. */ - public Entry acquireCandidate() throws IOException { - final Entry localCandidate = getCandidate(); + public @NonNull CandidateTransaction acquireCandidate() throws IOException { + final var localCandidate = getCandidate(); candidate = null; return localCandidate; } @@ -140,7 +146,7 @@ public abstract sealed class CommitTransactionPayload extends IdentifiablePayloa final var helper = MoreObjects.toStringHelper(this); final var localCandidate = candidate; if (localCandidate != null) { - helper.add("identifier", candidate.getKey()); + helper.add("identifier", candidate.transactionId()); } return helper.add("size", size()).toString(); } @@ -154,7 +160,7 @@ public abstract sealed class CommitTransactionPayload extends IdentifiablePayloa return new CT(this); } - static sealed class Simple extends CommitTransactionPayload { + static final class Simple extends CommitTransactionPayload { @java.io.Serial private static final long serialVersionUID = 1L; @@ -180,7 +186,7 @@ public abstract sealed class CommitTransactionPayload extends IdentifiablePayloa } } - static sealed class Chunked extends CommitTransactionPayload { + static final class Chunked extends CommitTransactionPayload { @java.io.Serial private static final long serialVersionUID = 1L; @@ -215,68 +221,4 @@ public abstract sealed class CommitTransactionPayload extends IdentifiablePayloa // Hidden on purpose } } - - @Deprecated(since = "7.0.0", forRemoval = true) - private static final class SimpleMagnesium extends Simple implements LegacySerializable { - @java.io.Serial - private static final long serialVersionUID = 1L; - - SimpleMagnesium(final byte[] serialized) { - super(serialized); - } - } - - @Deprecated(since = "7.0.0", forRemoval = true) - private static final class ChunkedMagnesium extends Chunked implements LegacySerializable { - @java.io.Serial - private static final long serialVersionUID = 1L; - - ChunkedMagnesium(final ChunkedByteArray source) { - super(source); - } - } - - @Deprecated(since = "7.0.0", forRemoval = true) - private static final class Proxy implements Externalizable { - @java.io.Serial - private static final long serialVersionUID = 1L; - - private CommitTransactionPayload payload; - - // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't - // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection. - @SuppressWarnings("checkstyle:RedundantModifier") - public Proxy() { - // For Externalizable - } - - Proxy(final CommitTransactionPayload payload) { - this.payload = requireNonNull(payload); - } - - @Override - public void writeExternal(final ObjectOutput out) throws IOException { - out.writeInt(payload.size()); - payload.writeBytes(out); - } - - @Override - public void readExternal(final ObjectInput in) throws IOException { - final int length = in.readInt(); - if (length < 0) { - throw new StreamCorruptedException("Invalid payload length " + length); - } else if (length < MAX_ARRAY_SIZE) { - final byte[] serialized = new byte[length]; - in.readFully(serialized); - payload = new SimpleMagnesium(serialized); - } else { - payload = new ChunkedMagnesium(ChunkedByteArray.readFrom(in, length, MAX_ARRAY_SIZE)); - } - } - - @java.io.Serial - private Object readResolve() { - return verifyNotNull(payload); - } - } }