cb261a2b19392da2373357383957a7f2cad4b4b5
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / persisted / CommitTransactionPayload.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.persisted;
9
10 import static com.google.common.base.Verify.verifyNotNull;
11 import static com.google.common.math.IntMath.ceilingPowerOfTwo;
12 import static java.util.Objects.requireNonNull;
13
14 import com.google.common.annotations.Beta;
15 import com.google.common.annotations.VisibleForTesting;
16 import com.google.common.base.MoreObjects;
17 import com.google.common.io.ByteStreams;
18 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
19 import java.io.DataInput;
20 import java.io.DataInputStream;
21 import java.io.DataOutputStream;
22 import java.io.Externalizable;
23 import java.io.IOException;
24 import java.io.ObjectInput;
25 import java.io.ObjectOutput;
26 import java.io.Serializable;
27 import java.io.StreamCorruptedException;
28 import java.util.AbstractMap.SimpleImmutableEntry;
29 import java.util.Map.Entry;
30 import org.apache.commons.lang3.SerializationUtils;
31 import org.eclipse.jdt.annotation.NonNull;
32 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
33 import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion;
34 import org.opendaylight.controller.cluster.io.ChunkedByteArray;
35 import org.opendaylight.controller.cluster.io.ChunkedOutputStream;
36 import org.opendaylight.controller.cluster.raft.messages.IdentifiablePayload;
37 import org.opendaylight.yangtools.concepts.Either;
38 import org.opendaylight.yangtools.yang.data.api.schema.stream.ReusableStreamReceiver;
39 import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter;
40 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 /**
45  * Payload persisted when a transaction commits. It contains the transaction identifier and the
46  * {@link DataTreeCandidate}
47  *
48  * @author Robert Varga
49  */
50 @Beta
51 public abstract sealed class CommitTransactionPayload extends IdentifiablePayload<TransactionIdentifier>
52         implements Serializable {
53     private static final Logger LOG = LoggerFactory.getLogger(CommitTransactionPayload.class);
54     private static final long serialVersionUID = 1L;
55
56     static final int MAX_ARRAY_SIZE = ceilingPowerOfTwo(Integer.getInteger(
57         "org.opendaylight.controller.cluster.datastore.persisted.max-array-size", 256 * 1024));
58
59     private volatile Entry<TransactionIdentifier, DataTreeCandidateWithVersion> candidate = null;
60
61     CommitTransactionPayload() {
62         // hidden on purpose
63     }
64
65     public static @NonNull CommitTransactionPayload create(final TransactionIdentifier transactionId,
66             final DataTreeCandidate candidate, final PayloadVersion version, final int initialSerializedBufferCapacity)
67                     throws IOException {
68         final ChunkedOutputStream cos = new ChunkedOutputStream(initialSerializedBufferCapacity, MAX_ARRAY_SIZE);
69         try (DataOutputStream dos = new DataOutputStream(cos)) {
70             transactionId.writeTo(dos);
71             DataTreeCandidateInputOutput.writeDataTreeCandidate(dos, version, candidate);
72         }
73
74         final Either<byte[], ChunkedByteArray> source = cos.toVariant();
75         LOG.debug("Initial buffer capacity {}, actual serialized size {}", initialSerializedBufferCapacity, cos.size());
76         return source.isFirst() ? new Simple(source.getFirst()) : new Chunked(source.getSecond());
77     }
78
79     @VisibleForTesting
80     public static @NonNull CommitTransactionPayload create(final TransactionIdentifier transactionId,
81             final DataTreeCandidate candidate, final PayloadVersion version) throws IOException {
82         return create(transactionId, candidate, version, 512);
83     }
84
85     @VisibleForTesting
86     public static @NonNull CommitTransactionPayload create(final TransactionIdentifier transactionId,
87             final DataTreeCandidate candidate) throws IOException {
88         return create(transactionId, candidate, PayloadVersion.current());
89     }
90
91     public @NonNull Entry<TransactionIdentifier, DataTreeCandidateWithVersion> getCandidate() throws IOException {
92         Entry<TransactionIdentifier, DataTreeCandidateWithVersion> localCandidate = candidate;
93         if (localCandidate == null) {
94             synchronized (this) {
95                 localCandidate = candidate;
96                 if (localCandidate == null) {
97                     candidate = localCandidate = getCandidate(ReusableImmutableNormalizedNodeStreamWriter.create());
98                 }
99             }
100         }
101         return localCandidate;
102     }
103
104     public final @NonNull Entry<TransactionIdentifier, DataTreeCandidateWithVersion> getCandidate(
105             final ReusableStreamReceiver receiver) throws IOException {
106         final DataInput in = newDataInput();
107         return new SimpleImmutableEntry<>(TransactionIdentifier.readFrom(in),
108                 DataTreeCandidateInputOutput.readDataTreeCandidate(in, receiver));
109     }
110
111     @Override
112     public TransactionIdentifier getIdentifier() {
113         try  {
114             return getCandidate().getKey();
115         } catch (IOException e) {
116             throw new IllegalStateException("Candidate deserialization failed.", e);
117         }
118     }
119
120     @Override
121     public final int serializedSize() {
122         // TODO: this is not entirely accurate as the the byte[] can be chunked by the serialization stream
123         return ProxySizeHolder.PROXY_SIZE + size();
124     }
125
126     /**
127      * The cached candidate needs to be cleared after it is done applying to the DataTree, otherwise it would be keeping
128      * deserialized in memory which are not needed anymore leading to wasted memory. This lets the payload know that
129      * this was the last time the candidate was needed ant it is safe to be cleared.
130      */
131     public Entry<TransactionIdentifier, DataTreeCandidateWithVersion> acquireCandidate() throws IOException {
132         final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> localCandidate = getCandidate();
133         candidate = null;
134         return localCandidate;
135     }
136
137     @Override
138     public final String toString() {
139         final var helper = MoreObjects.toStringHelper(this);
140         final var localCandidate = candidate;
141         if (localCandidate != null) {
142             helper.add("identifier", candidate.getKey());
143         }
144         return helper.add("size", size()).toString();
145     }
146
147     abstract void writeBytes(ObjectOutput out) throws IOException;
148
149     abstract DataInput newDataInput();
150
151     @Override
152     protected final Object writeReplace() {
153         return new Proxy(this);
154     }
155
156     static final class Simple extends CommitTransactionPayload {
157         private static final long serialVersionUID = 1L;
158
159         private final byte[] serialized;
160
161         Simple(final byte[] serialized) {
162             this.serialized = requireNonNull(serialized);
163         }
164
165         @Override
166         public int size() {
167             return serialized.length;
168         }
169
170         @Override
171         DataInput newDataInput() {
172             return ByteStreams.newDataInput(serialized);
173         }
174
175         @Override
176         void writeBytes(final ObjectOutput out) throws IOException {
177             out.write(serialized);
178         }
179     }
180
181     static final class Chunked extends CommitTransactionPayload {
182         private static final long serialVersionUID = 1L;
183
184         @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "Handled via serialization proxy")
185         private final ChunkedByteArray source;
186
187         Chunked(final ChunkedByteArray source) {
188             this.source = requireNonNull(source);
189         }
190
191         @Override
192         void writeBytes(final ObjectOutput out) throws IOException {
193             source.copyTo(out);
194         }
195
196         @Override
197         public int size() {
198             return source.size();
199         }
200
201         @Override
202         DataInput newDataInput() {
203             return new DataInputStream(source.openStream());
204         }
205     }
206
207     // Exists to break initialization dependency between CommitTransactionPayload/Simple/Proxy
208     private static final class ProxySizeHolder {
209         static final int PROXY_SIZE = SerializationUtils.serialize(new Proxy(new Simple(new byte[0]))).length;
210
211         private ProxySizeHolder() {
212             // Hidden on purpose
213         }
214     }
215
216     private static final class Proxy implements Externalizable {
217         private static final long serialVersionUID = 1L;
218
219         private CommitTransactionPayload payload;
220
221         // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
222         // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
223         @SuppressWarnings("checkstyle:RedundantModifier")
224         public Proxy() {
225             // For Externalizable
226         }
227
228         Proxy(final CommitTransactionPayload payload) {
229             this.payload = requireNonNull(payload);
230         }
231
232         @Override
233         public void writeExternal(final ObjectOutput out) throws IOException {
234             out.writeInt(payload.size());
235             payload.writeBytes(out);
236         }
237
238         @Override
239         public void readExternal(final ObjectInput in) throws IOException {
240             final int length = in.readInt();
241             if (length < 0) {
242                 throw new StreamCorruptedException("Invalid payload length " + length);
243             } else if (length < MAX_ARRAY_SIZE) {
244                 final byte[] serialized = new byte[length];
245                 in.readFully(serialized);
246                 payload = new Simple(serialized);
247             } else {
248                 payload = new Chunked(ChunkedByteArray.readFrom(in, length, MAX_ARRAY_SIZE));
249             }
250         }
251
252         private Object readResolve() {
253             return verifyNotNull(payload);
254         }
255     }
256 }