2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.persisted;
10 import static com.google.common.math.IntMath.ceilingPowerOfTwo;
11 import static java.util.Objects.requireNonNull;
13 import com.google.common.annotations.Beta;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.MoreObjects;
16 import com.google.common.io.ByteStreams;
17 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
18 import java.io.DataInput;
19 import java.io.DataInputStream;
20 import java.io.DataOutputStream;
21 import java.io.IOException;
22 import java.io.ObjectOutput;
23 import java.io.Serializable;
24 import org.apache.commons.lang3.SerializationUtils;
25 import org.eclipse.jdt.annotation.NonNull;
26 import org.eclipse.jdt.annotation.NonNullByDefault;
27 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
28 import org.opendaylight.controller.cluster.io.ChunkedByteArray;
29 import org.opendaylight.controller.cluster.io.ChunkedOutputStream;
30 import org.opendaylight.controller.cluster.raft.messages.IdentifiablePayload;
31 import org.opendaylight.yangtools.yang.data.api.schema.stream.ReusableStreamReceiver;
32 import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion;
33 import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter;
34 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
39 * Payload persisted when a transaction commits. It contains the transaction identifier and the
40 * {@link DataTreeCandidate}
42 * @author Robert Varga
45 public abstract sealed class CommitTransactionPayload extends IdentifiablePayload<TransactionIdentifier>
46 implements Serializable {
48 public record CandidateTransaction(
49 TransactionIdentifier transactionId,
50 DataTreeCandidate candidate,
51 NormalizedNodeStreamVersion streamVersion) {
52 public CandidateTransaction {
53 requireNonNull(transactionId);
54 requireNonNull(candidate);
55 requireNonNull(streamVersion);
59 private static final Logger LOG = LoggerFactory.getLogger(CommitTransactionPayload.class);
60 private static final long serialVersionUID = 1L;
62 static final int MAX_ARRAY_SIZE = ceilingPowerOfTwo(Integer.getInteger(
63 "org.opendaylight.controller.cluster.datastore.persisted.max-array-size", 256 * 1024));
65 private volatile CandidateTransaction candidate = null;
67 private CommitTransactionPayload() {
71 public static @NonNull CommitTransactionPayload create(final TransactionIdentifier transactionId,
72 final DataTreeCandidate candidate, final PayloadVersion version, final int initialSerializedBufferCapacity)
74 final var cos = new ChunkedOutputStream(initialSerializedBufferCapacity, MAX_ARRAY_SIZE);
75 try (var dos = new DataOutputStream(cos)) {
76 transactionId.writeTo(dos);
77 DataTreeCandidateInputOutput.writeDataTreeCandidate(dos, version, candidate);
80 final var source = cos.toVariant();
81 LOG.debug("Initial buffer capacity {}, actual serialized size {}", initialSerializedBufferCapacity, cos.size());
82 return source.isFirst() ? new Simple(source.getFirst()) : new Chunked(source.getSecond());
86 public static @NonNull CommitTransactionPayload create(final TransactionIdentifier transactionId,
87 final DataTreeCandidate candidate, final PayloadVersion version) throws IOException {
88 return create(transactionId, candidate, version, 512);
92 public static @NonNull CommitTransactionPayload create(final TransactionIdentifier transactionId,
93 final DataTreeCandidate candidate) throws IOException {
94 return create(transactionId, candidate, PayloadVersion.current());
97 public @NonNull CandidateTransaction getCandidate() throws IOException {
98 var localCandidate = candidate;
99 if (localCandidate == null) {
100 synchronized (this) {
101 localCandidate = candidate;
102 if (localCandidate == null) {
103 candidate = localCandidate = getCandidate(ReusableImmutableNormalizedNodeStreamWriter.create());
107 return localCandidate;
110 public final @NonNull CandidateTransaction getCandidate(final ReusableStreamReceiver receiver) throws IOException {
111 final var in = newDataInput();
112 final var transactionId = TransactionIdentifier.readFrom(in);
113 final var readCandidate = DataTreeCandidateInputOutput.readDataTreeCandidate(in, receiver);
115 return new CandidateTransaction(transactionId, readCandidate.candidate(), readCandidate.version());
119 public TransactionIdentifier getIdentifier() {
121 return getCandidate().transactionId();
122 } catch (IOException e) {
123 throw new IllegalStateException("Candidate deserialization failed.", e);
128 public final int serializedSize() {
129 // TODO: this is not entirely accurate as the the byte[] can be chunked by the serialization stream
130 return ProxySizeHolder.PROXY_SIZE + size();
134 * The cached candidate needs to be cleared after it is done applying to the DataTree, otherwise it would be keeping
135 * deserialized in memory which are not needed anymore leading to wasted memory. This lets the payload know that
136 * this was the last time the candidate was needed ant it is safe to be cleared.
138 public @NonNull CandidateTransaction acquireCandidate() throws IOException {
139 final var localCandidate = getCandidate();
141 return localCandidate;
145 public final String toString() {
146 final var helper = MoreObjects.toStringHelper(this);
147 final var localCandidate = candidate;
148 if (localCandidate != null) {
149 helper.add("identifier", candidate.transactionId());
151 return helper.add("size", size()).toString();
154 abstract void writeBytes(ObjectOutput out) throws IOException;
156 abstract DataInput newDataInput();
159 public final Object writeReplace() {
163 static final class Simple extends CommitTransactionPayload {
165 private static final long serialVersionUID = 1L;
167 private final byte[] serialized;
169 Simple(final byte[] serialized) {
170 this.serialized = requireNonNull(serialized);
175 return serialized.length;
179 DataInput newDataInput() {
180 return ByteStreams.newDataInput(serialized);
184 void writeBytes(final ObjectOutput out) throws IOException {
185 out.write(serialized);
189 static final class Chunked extends CommitTransactionPayload {
191 private static final long serialVersionUID = 1L;
193 @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "Handled via serialization proxy")
194 private final ChunkedByteArray source;
196 Chunked(final ChunkedByteArray source) {
197 this.source = requireNonNull(source);
201 void writeBytes(final ObjectOutput out) throws IOException {
207 return source.size();
211 DataInput newDataInput() {
212 return new DataInputStream(source.openStream());
216 // Exists to break initialization dependency between CommitTransactionPayload/Simple/Proxy
217 private static final class ProxySizeHolder {
218 static final int PROXY_SIZE = SerializationUtils.serialize(new CT(new Simple(new byte[0]))).length;
220 private ProxySizeHolder() {