Expose stream version used for DataTreeCandidate stream
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / persisted / CommitTransactionPayload.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.persisted;
9
10 import static com.google.common.base.Verify.verifyNotNull;
11 import static java.util.Objects.requireNonNull;
12 import static org.opendaylight.controller.cluster.datastore.persisted.ChunkedOutputStream.MAX_ARRAY_SIZE;
13
14 import com.google.common.annotations.Beta;
15 import com.google.common.annotations.VisibleForTesting;
16 import com.google.common.io.ByteStreams;
17 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
18 import java.io.DataInput;
19 import java.io.DataInputStream;
20 import java.io.DataOutputStream;
21 import java.io.Externalizable;
22 import java.io.IOException;
23 import java.io.ObjectInput;
24 import java.io.ObjectOutput;
25 import java.io.Serializable;
26 import java.io.StreamCorruptedException;
27 import java.util.AbstractMap.SimpleImmutableEntry;
28 import java.util.Map.Entry;
29 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
30 import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion;
31 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.IdentifiablePayload;
32 import org.opendaylight.yangtools.concepts.Variant;
33 import org.opendaylight.yangtools.yang.data.api.schema.stream.ReusableStreamReceiver;
34 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
35 import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 /**
40  * Payload persisted when a transaction commits. It contains the transaction identifier and the
41  * {@link DataTreeCandidate}
42  *
43  * @author Robert Varga
44  */
45 @Beta
46 public abstract class CommitTransactionPayload extends IdentifiablePayload<TransactionIdentifier>
47         implements Serializable {
48     private static final Logger LOG = LoggerFactory.getLogger(CommitTransactionPayload.class);
49     private static final long serialVersionUID = 1L;
50
51     private volatile Entry<TransactionIdentifier, DataTreeCandidateWithVersion> candidate = null;
52
53     CommitTransactionPayload() {
54
55     }
56
57     public static CommitTransactionPayload create(final TransactionIdentifier transactionId,
58             final DataTreeCandidate candidate, final int initialSerializedBufferCapacity) throws IOException {
59
60         final ChunkedOutputStream cos = new ChunkedOutputStream(initialSerializedBufferCapacity);
61         try (DataOutputStream dos = new DataOutputStream(cos)) {
62             transactionId.writeTo(dos);
63             DataTreeCandidateInputOutput.writeDataTreeCandidate(dos, candidate);
64         }
65
66         final Variant<byte[], ChunkedByteArray> source = cos.toVariant();
67         LOG.debug("Initial buffer capacity {}, actual serialized size {}", initialSerializedBufferCapacity, cos.size());
68         return source.isFirst() ? new Simple(source.getFirst()) : new Chunked(source.getSecond());
69     }
70
71     @VisibleForTesting
72     public static CommitTransactionPayload create(final TransactionIdentifier transactionId,
73             final DataTreeCandidate candidate) throws IOException {
74         return create(transactionId, candidate, 512);
75     }
76
77     public Entry<TransactionIdentifier, DataTreeCandidateWithVersion> getCandidate() throws IOException {
78         Entry<TransactionIdentifier, DataTreeCandidateWithVersion> localCandidate = candidate;
79         if (localCandidate == null) {
80             synchronized (this) {
81                 localCandidate = candidate;
82                 if (localCandidate == null) {
83                     candidate = localCandidate = getCandidate(ReusableImmutableNormalizedNodeStreamWriter.create());
84                 }
85             }
86         }
87         return localCandidate;
88     }
89
90     public final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> getCandidate(
91             final ReusableStreamReceiver receiver) throws IOException {
92         final DataInput in = newDataInput();
93         return new SimpleImmutableEntry<>(TransactionIdentifier.readFrom(in),
94                 DataTreeCandidateInputOutput.readDataTreeCandidate(in, receiver));
95     }
96
97     @Override
98     public TransactionIdentifier getIdentifier() {
99         try  {
100             return getCandidate().getKey();
101         } catch (IOException e) {
102             throw new IllegalStateException("Candidate deserialization failed.", e);
103         }
104     }
105
106     abstract void writeBytes(ObjectOutput out) throws IOException;
107
108     abstract DataInput newDataInput();
109
110     final Object writeReplace() {
111         return new Proxy(this);
112     }
113
114     private static final class Simple extends CommitTransactionPayload {
115         private static final long serialVersionUID = 1L;
116
117         private final byte[] serialized;
118
119         Simple(final byte[] serialized) {
120             this.serialized = requireNonNull(serialized);
121         }
122
123         @Override
124         public int size() {
125             return serialized.length;
126         }
127
128         @Override
129         DataInput newDataInput() {
130             return ByteStreams.newDataInput(serialized);
131         }
132
133         @Override
134         void writeBytes(final ObjectOutput out) throws IOException {
135             out.write(serialized);
136         }
137     }
138
139     private static final class Chunked extends CommitTransactionPayload {
140         private static final long serialVersionUID = 1L;
141
142         @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "Handled via serialization proxy")
143         private final ChunkedByteArray source;
144
145         Chunked(final ChunkedByteArray source) {
146             this.source = requireNonNull(source);
147         }
148
149         @Override
150         void writeBytes(final ObjectOutput out) throws IOException {
151             source.copyTo(out);
152         }
153
154         @Override
155         public int size() {
156             return source.size();
157         }
158
159         @Override
160         DataInput newDataInput() {
161             return new DataInputStream(source.openStream());
162         }
163     }
164
165     private static final class Proxy implements Externalizable {
166         private static final long serialVersionUID = 1L;
167
168         private CommitTransactionPayload payload;
169
170         // checkstyle flags the public modifier as redundant which really doesn't make sense since it clearly isn't
171         // redundant. It is explicitly needed for Java serialization to be able to create instances via reflection.
172         @SuppressWarnings("checkstyle:RedundantModifier")
173         public Proxy() {
174             // For Externalizable
175         }
176
177         Proxy(final CommitTransactionPayload payload) {
178             this.payload = requireNonNull(payload);
179         }
180
181         @Override
182         public void writeExternal(final ObjectOutput out) throws IOException {
183             out.writeInt(payload.size());
184             payload.writeBytes(out);
185         }
186
187         @Override
188         public void readExternal(final ObjectInput in) throws IOException {
189             final int length = in.readInt();
190             if (length < 0) {
191                 throw new StreamCorruptedException("Invalid payload length " + length);
192             } else if (length < MAX_ARRAY_SIZE) {
193                 final byte[] serialized = new byte[length];
194                 in.readFully(serialized);
195                 payload = new Simple(serialized);
196             } else {
197                 payload = new Chunked(ChunkedByteArray.readFrom(in, length, MAX_ARRAY_SIZE));
198             }
199         }
200
201         private Object readResolve() {
202             return verifyNotNull(payload);
203         }
204     }
205 }