X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fpersisted%2FCommitTransactionPayload.java;fp=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fpersisted%2FCommitTransactionPayload.java;h=45cbcc851a80ead2bf0508d024a61f2cd6cc3a09;hb=d6f72566c1d394cf5717ce9757693abd80c70c02;hp=c8639bbebbf9ab77db2ce445411fee4b53a9e5e5;hpb=2732830492deefff5bac96a3805118c54b79df15;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java index c8639bbebb..45cbcc851a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java @@ -21,17 +21,15 @@ import java.io.DataOutputStream; import java.io.IOException; import java.io.ObjectOutput; import java.io.Serializable; -import java.util.AbstractMap.SimpleImmutableEntry; -import java.util.Map.Entry; import org.apache.commons.lang3.SerializationUtils; import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.NonNullByDefault; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion; import org.opendaylight.controller.cluster.io.ChunkedByteArray; import org.opendaylight.controller.cluster.io.ChunkedOutputStream; import org.opendaylight.controller.cluster.raft.messages.IdentifiablePayload; -import org.opendaylight.yangtools.concepts.Either; import org.opendaylight.yangtools.yang.data.api.schema.stream.ReusableStreamReceiver; +import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion; import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter; import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate; import org.slf4j.Logger; @@ -46,28 +44,40 @@ import org.slf4j.LoggerFactory; @Beta public abstract sealed class CommitTransactionPayload extends IdentifiablePayload implements Serializable { + @NonNullByDefault + public record CandidateTransaction( + TransactionIdentifier transactionId, + DataTreeCandidate candidate, + NormalizedNodeStreamVersion streamVersion) { + public CandidateTransaction { + requireNonNull(transactionId); + requireNonNull(candidate); + requireNonNull(streamVersion); + } + } + private static final Logger LOG = LoggerFactory.getLogger(CommitTransactionPayload.class); private static final long serialVersionUID = 1L; static final int MAX_ARRAY_SIZE = ceilingPowerOfTwo(Integer.getInteger( "org.opendaylight.controller.cluster.datastore.persisted.max-array-size", 256 * 1024)); - private volatile Entry candidate = null; + private volatile CandidateTransaction candidate = null; - CommitTransactionPayload() { + private CommitTransactionPayload() { // hidden on purpose } public static @NonNull CommitTransactionPayload create(final TransactionIdentifier transactionId, final DataTreeCandidate candidate, final PayloadVersion version, final int initialSerializedBufferCapacity) throws IOException { - final ChunkedOutputStream cos = new ChunkedOutputStream(initialSerializedBufferCapacity, MAX_ARRAY_SIZE); - try (DataOutputStream dos = new DataOutputStream(cos)) { + final var cos = new ChunkedOutputStream(initialSerializedBufferCapacity, MAX_ARRAY_SIZE); + try (var dos = new DataOutputStream(cos)) { transactionId.writeTo(dos); DataTreeCandidateInputOutput.writeDataTreeCandidate(dos, version, candidate); } - final Either source = cos.toVariant(); + final var source = cos.toVariant(); LOG.debug("Initial buffer capacity {}, actual serialized size {}", initialSerializedBufferCapacity, cos.size()); return source.isFirst() ? new Simple(source.getFirst()) : new Chunked(source.getSecond()); } @@ -84,8 +94,8 @@ public abstract sealed class CommitTransactionPayload extends IdentifiablePayloa return create(transactionId, candidate, PayloadVersion.current()); } - public @NonNull Entry getCandidate() throws IOException { - Entry localCandidate = candidate; + public @NonNull CandidateTransaction getCandidate() throws IOException { + var localCandidate = candidate; if (localCandidate == null) { synchronized (this) { localCandidate = candidate; @@ -97,17 +107,18 @@ public abstract sealed class CommitTransactionPayload extends IdentifiablePayloa return localCandidate; } - public final @NonNull Entry getCandidate( - final ReusableStreamReceiver receiver) throws IOException { - final DataInput in = newDataInput(); - return new SimpleImmutableEntry<>(TransactionIdentifier.readFrom(in), - DataTreeCandidateInputOutput.readDataTreeCandidate(in, receiver)); + public final @NonNull CandidateTransaction getCandidate(final ReusableStreamReceiver receiver) throws IOException { + final var in = newDataInput(); + final var transactionId = TransactionIdentifier.readFrom(in); + final var readCandidate = DataTreeCandidateInputOutput.readDataTreeCandidate(in, receiver); + + return new CandidateTransaction(transactionId, readCandidate.candidate(), readCandidate.version()); } @Override public TransactionIdentifier getIdentifier() { try { - return getCandidate().getKey(); + return getCandidate().transactionId(); } catch (IOException e) { throw new IllegalStateException("Candidate deserialization failed.", e); } @@ -124,8 +135,8 @@ public abstract sealed class CommitTransactionPayload extends IdentifiablePayloa * deserialized in memory which are not needed anymore leading to wasted memory. This lets the payload know that * this was the last time the candidate was needed ant it is safe to be cleared. */ - public Entry acquireCandidate() throws IOException { - final Entry localCandidate = getCandidate(); + public @NonNull CandidateTransaction acquireCandidate() throws IOException { + final var localCandidate = getCandidate(); candidate = null; return localCandidate; } @@ -135,7 +146,7 @@ public abstract sealed class CommitTransactionPayload extends IdentifiablePayloa final var helper = MoreObjects.toStringHelper(this); final var localCandidate = candidate; if (localCandidate != null) { - helper.add("identifier", candidate.getKey()); + helper.add("identifier", candidate.transactionId()); } return helper.add("size", size()).toString(); }