X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fpersisted%2FCommitTransactionPayload.java;h=fed348320defd0494318956c5f2e9d847e773af9;hb=HEAD;hp=6749244fbbbae53599d2c276e2193285f493c2a7;hpb=6016f3caa10acd06e9ebd2f813851ef1c9539176;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java index 6749244fbb..45cbcc851a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java @@ -7,7 +7,6 @@ */ package org.opendaylight.controller.cluster.datastore.persisted; -import static com.google.common.base.Verify.verifyNotNull; import static com.google.common.math.IntMath.ceilingPowerOfTwo; import static java.util.Objects.requireNonNull; @@ -19,22 +18,18 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.DataInput; import java.io.DataInputStream; import java.io.DataOutputStream; -import java.io.Externalizable; import java.io.IOException; -import java.io.ObjectInput; import java.io.ObjectOutput; import java.io.Serializable; -import java.io.StreamCorruptedException; -import java.util.AbstractMap.SimpleImmutableEntry; -import java.util.Map.Entry; +import org.apache.commons.lang3.SerializationUtils; import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.NonNullByDefault; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion; import org.opendaylight.controller.cluster.io.ChunkedByteArray; import org.opendaylight.controller.cluster.io.ChunkedOutputStream; -import org.opendaylight.controller.cluster.raft.protobuff.client.messages.IdentifiablePayload; -import org.opendaylight.yangtools.concepts.Either; +import org.opendaylight.controller.cluster.raft.messages.IdentifiablePayload; import org.opendaylight.yangtools.yang.data.api.schema.stream.ReusableStreamReceiver; +import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion; import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter; import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate; import org.slf4j.Logger; @@ -47,30 +42,42 @@ import org.slf4j.LoggerFactory; * @author Robert Varga */ @Beta -public abstract class CommitTransactionPayload extends IdentifiablePayload +public abstract sealed class CommitTransactionPayload extends IdentifiablePayload implements Serializable { + @NonNullByDefault + public record CandidateTransaction( + TransactionIdentifier transactionId, + DataTreeCandidate candidate, + NormalizedNodeStreamVersion streamVersion) { + public CandidateTransaction { + requireNonNull(transactionId); + requireNonNull(candidate); + requireNonNull(streamVersion); + } + } + private static final Logger LOG = LoggerFactory.getLogger(CommitTransactionPayload.class); private static final long serialVersionUID = 1L; - private static final int MAX_ARRAY_SIZE = ceilingPowerOfTwo(Integer.getInteger( + static final int MAX_ARRAY_SIZE = ceilingPowerOfTwo(Integer.getInteger( "org.opendaylight.controller.cluster.datastore.persisted.max-array-size", 256 * 1024)); - private volatile Entry candidate = null; + private volatile CandidateTransaction candidate = null; - CommitTransactionPayload() { + private CommitTransactionPayload() { // hidden on purpose } public static @NonNull CommitTransactionPayload create(final TransactionIdentifier transactionId, final DataTreeCandidate candidate, final PayloadVersion version, final int initialSerializedBufferCapacity) throws IOException { - final ChunkedOutputStream cos = new ChunkedOutputStream(initialSerializedBufferCapacity, MAX_ARRAY_SIZE); - try (DataOutputStream dos = new DataOutputStream(cos)) { + final var cos = new ChunkedOutputStream(initialSerializedBufferCapacity, MAX_ARRAY_SIZE); + try (var dos = new DataOutputStream(cos)) { transactionId.writeTo(dos); DataTreeCandidateInputOutput.writeDataTreeCandidate(dos, version, candidate); } - final Either source = cos.toVariant(); + final var source = cos.toVariant(); LOG.debug("Initial buffer capacity {}, actual serialized size {}", initialSerializedBufferCapacity, cos.size()); return source.isFirst() ? new Simple(source.getFirst()) : new Chunked(source.getSecond()); } @@ -87,8 +94,8 @@ public abstract class CommitTransactionPayload extends IdentifiablePayload getCandidate() throws IOException { - Entry localCandidate = candidate; + public @NonNull CandidateTransaction getCandidate() throws IOException { + var localCandidate = candidate; if (localCandidate == null) { synchronized (this) { localCandidate = candidate; @@ -100,29 +107,36 @@ public abstract class CommitTransactionPayload extends IdentifiablePayload getCandidate( - final ReusableStreamReceiver receiver) throws IOException { - final DataInput in = newDataInput(); - return new SimpleImmutableEntry<>(TransactionIdentifier.readFrom(in), - DataTreeCandidateInputOutput.readDataTreeCandidate(in, receiver)); + public final @NonNull CandidateTransaction getCandidate(final ReusableStreamReceiver receiver) throws IOException { + final var in = newDataInput(); + final var transactionId = TransactionIdentifier.readFrom(in); + final var readCandidate = DataTreeCandidateInputOutput.readDataTreeCandidate(in, receiver); + + return new CandidateTransaction(transactionId, readCandidate.candidate(), readCandidate.version()); } @Override public TransactionIdentifier getIdentifier() { try { - return getCandidate().getKey(); + return getCandidate().transactionId(); } catch (IOException e) { throw new IllegalStateException("Candidate deserialization failed.", e); } } + @Override + public final int serializedSize() { + // TODO: this is not entirely accurate as the the byte[] can be chunked by the serialization stream + return ProxySizeHolder.PROXY_SIZE + size(); + } + /** * The cached candidate needs to be cleared after it is done applying to the DataTree, otherwise it would be keeping * deserialized in memory which are not needed anymore leading to wasted memory. This lets the payload know that * this was the last time the candidate was needed ant it is safe to be cleared. */ - public Entry acquireCandidate() throws IOException { - final Entry localCandidate = getCandidate(); + public @NonNull CandidateTransaction acquireCandidate() throws IOException { + final var localCandidate = getCandidate(); candidate = null; return localCandidate; } @@ -132,7 +146,7 @@ public abstract class CommitTransactionPayload extends IdentifiablePayload