Introduce CommitTransactionPayload.CandidateTransaction
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / persisted / CommitTransactionPayload.java
index c8639bbebbf9ab77db2ce445411fee4b53a9e5e5..45cbcc851a80ead2bf0508d024a61f2cd6cc3a09 100644 (file)
@@ -21,17 +21,15 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.ObjectOutput;
 import java.io.Serializable;
-import java.util.AbstractMap.SimpleImmutableEntry;
-import java.util.Map.Entry;
 import org.apache.commons.lang3.SerializationUtils;
 import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion;
 import org.opendaylight.controller.cluster.io.ChunkedByteArray;
 import org.opendaylight.controller.cluster.io.ChunkedOutputStream;
 import org.opendaylight.controller.cluster.raft.messages.IdentifiablePayload;
-import org.opendaylight.yangtools.concepts.Either;
 import org.opendaylight.yangtools.yang.data.api.schema.stream.ReusableStreamReceiver;
+import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion;
 import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter;
 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
 import org.slf4j.Logger;
@@ -46,28 +44,40 @@ import org.slf4j.LoggerFactory;
 @Beta
 public abstract sealed class CommitTransactionPayload extends IdentifiablePayload<TransactionIdentifier>
         implements Serializable {
+    @NonNullByDefault
+    public record CandidateTransaction(
+            TransactionIdentifier transactionId,
+            DataTreeCandidate candidate,
+            NormalizedNodeStreamVersion streamVersion) {
+        public CandidateTransaction {
+            requireNonNull(transactionId);
+            requireNonNull(candidate);
+            requireNonNull(streamVersion);
+        }
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(CommitTransactionPayload.class);
     private static final long serialVersionUID = 1L;
 
     static final int MAX_ARRAY_SIZE = ceilingPowerOfTwo(Integer.getInteger(
         "org.opendaylight.controller.cluster.datastore.persisted.max-array-size", 256 * 1024));
 
-    private volatile Entry<TransactionIdentifier, DataTreeCandidateWithVersion> candidate = null;
+    private volatile CandidateTransaction candidate = null;
 
-    CommitTransactionPayload() {
+    private CommitTransactionPayload() {
         // hidden on purpose
     }
 
     public static @NonNull CommitTransactionPayload create(final TransactionIdentifier transactionId,
             final DataTreeCandidate candidate, final PayloadVersion version, final int initialSerializedBufferCapacity)
                     throws IOException {
-        final ChunkedOutputStream cos = new ChunkedOutputStream(initialSerializedBufferCapacity, MAX_ARRAY_SIZE);
-        try (DataOutputStream dos = new DataOutputStream(cos)) {
+        final var cos = new ChunkedOutputStream(initialSerializedBufferCapacity, MAX_ARRAY_SIZE);
+        try (var dos = new DataOutputStream(cos)) {
             transactionId.writeTo(dos);
             DataTreeCandidateInputOutput.writeDataTreeCandidate(dos, version, candidate);
         }
 
-        final Either<byte[], ChunkedByteArray> source = cos.toVariant();
+        final var source = cos.toVariant();
         LOG.debug("Initial buffer capacity {}, actual serialized size {}", initialSerializedBufferCapacity, cos.size());
         return source.isFirst() ? new Simple(source.getFirst()) : new Chunked(source.getSecond());
     }
@@ -84,8 +94,8 @@ public abstract sealed class CommitTransactionPayload extends IdentifiablePayloa
         return create(transactionId, candidate, PayloadVersion.current());
     }
 
-    public @NonNull Entry<TransactionIdentifier, DataTreeCandidateWithVersion> getCandidate() throws IOException {
-        Entry<TransactionIdentifier, DataTreeCandidateWithVersion> localCandidate = candidate;
+    public @NonNull CandidateTransaction getCandidate() throws IOException {
+        var localCandidate = candidate;
         if (localCandidate == null) {
             synchronized (this) {
                 localCandidate = candidate;
@@ -97,17 +107,18 @@ public abstract sealed class CommitTransactionPayload extends IdentifiablePayloa
         return localCandidate;
     }
 
-    public final @NonNull Entry<TransactionIdentifier, DataTreeCandidateWithVersion> getCandidate(
-            final ReusableStreamReceiver receiver) throws IOException {
-        final DataInput in = newDataInput();
-        return new SimpleImmutableEntry<>(TransactionIdentifier.readFrom(in),
-                DataTreeCandidateInputOutput.readDataTreeCandidate(in, receiver));
+    public final @NonNull CandidateTransaction getCandidate(final ReusableStreamReceiver receiver) throws IOException {
+        final var in = newDataInput();
+        final var transactionId = TransactionIdentifier.readFrom(in);
+        final var readCandidate = DataTreeCandidateInputOutput.readDataTreeCandidate(in, receiver);
+
+        return new CandidateTransaction(transactionId, readCandidate.candidate(), readCandidate.version());
     }
 
     @Override
     public TransactionIdentifier getIdentifier() {
         try  {
-            return getCandidate().getKey();
+            return getCandidate().transactionId();
         } catch (IOException e) {
             throw new IllegalStateException("Candidate deserialization failed.", e);
         }
@@ -124,8 +135,8 @@ public abstract sealed class CommitTransactionPayload extends IdentifiablePayloa
      * deserialized in memory which are not needed anymore leading to wasted memory. This lets the payload know that
      * this was the last time the candidate was needed ant it is safe to be cleared.
      */
-    public Entry<TransactionIdentifier, DataTreeCandidateWithVersion> acquireCandidate() throws IOException {
-        final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> localCandidate = getCandidate();
+    public @NonNull CandidateTransaction acquireCandidate() throws IOException {
+        final var localCandidate = getCandidate();
         candidate = null;
         return localCandidate;
     }
@@ -135,7 +146,7 @@ public abstract sealed class CommitTransactionPayload extends IdentifiablePayloa
         final var helper = MoreObjects.toStringHelper(this);
         final var localCandidate = candidate;
         if (localCandidate != null) {
-            helper.add("identifier", candidate.getKey());
+            helper.add("identifier", candidate.transactionId());
         }
         return helper.add("size", size()).toString();
     }