BUG 4589 : Handle writing and reading large strings
[controller.git] / opendaylight / md-sal / sal-clustering-commons / src / main / java / org / opendaylight / controller / cluster / datastore / node / utils / stream / NormalizedNodeOutputStreamWriter.java
1 /*
2  *
3  *  Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  *  This program and the accompanying materials are made available under the
6  *  terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  *  and is available at http://www.eclipse.org/legal/epl-v10.html
8  *
9  */
10
11 package org.opendaylight.controller.cluster.datastore.node.utils.stream;
12
13 import com.google.common.base.Preconditions;
14 import java.io.DataOutput;
15 import java.io.DataOutputStream;
16 import java.io.IOException;
17 import java.io.OutputStream;
18 import java.nio.charset.StandardCharsets;
19 import java.util.Collection;
20 import java.util.HashMap;
21 import java.util.Map;
22 import java.util.Set;
23 import org.opendaylight.yangtools.yang.common.QName;
24 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
25 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
26 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
27 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 /**
32  * NormalizedNodeOutputStreamWriter will be used by distributed datastore to send normalized node in
33  * a stream.
34  * A stream writer wrapper around this class will write node objects to stream in recursive manner.
35  * for example - If you have a ContainerNode which has a two LeafNode as children, then
36  * you will first call {@link #startContainerNode(YangInstanceIdentifier.NodeIdentifier, int)}, then will call
37  * {@link #leafNode(YangInstanceIdentifier.NodeIdentifier, Object)} twice and then, {@link #endNode()} to end
38  * container node.
39  *
40  * Based on the each node, the node type is also written to the stream, that helps in reconstructing the object,
41  * while reading.
42  *
43  *
44  */
45
46 public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWriter {
47
48     private static final Logger LOG = LoggerFactory.getLogger(NormalizedNodeOutputStreamWriter.class);
49
50     static final byte SIGNATURE_MARKER = (byte) 0xab;
51     static final short CURRENT_VERSION = (short) 1;
52
53     static final byte IS_CODE_VALUE = 1;
54     static final byte IS_STRING_VALUE = 2;
55     static final byte IS_NULL_VALUE = 3;
56
57     private final DataOutput output;
58
59     private final Map<String, Integer> stringCodeMap = new HashMap<>();
60
61     private NormalizedNodeWriter normalizedNodeWriter;
62
63     private boolean wroteSignatureMarker;
64
65     public NormalizedNodeOutputStreamWriter(OutputStream stream) throws IOException {
66         Preconditions.checkNotNull(stream);
67         output = new DataOutputStream(stream);
68     }
69
70     public NormalizedNodeOutputStreamWriter(DataOutput output) {
71         this.output = Preconditions.checkNotNull(output);
72     }
73
74     private NormalizedNodeWriter normalizedNodeWriter() {
75         if(normalizedNodeWriter == null) {
76             normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(this);
77         }
78
79         return normalizedNodeWriter;
80     }
81
82     public void writeNormalizedNode(NormalizedNode<?, ?> node) throws IOException {
83         writeSignatureMarkerAndVersionIfNeeded();
84         normalizedNodeWriter().write(node);
85     }
86
87     private void writeSignatureMarkerAndVersionIfNeeded() throws IOException {
88         if(!wroteSignatureMarker) {
89             output.writeByte(SIGNATURE_MARKER);
90             output.writeShort(CURRENT_VERSION);
91             wroteSignatureMarker = true;
92         }
93     }
94
95     @Override
96     public void leafNode(YangInstanceIdentifier.NodeIdentifier name, Object value) throws IOException, IllegalArgumentException {
97         Preconditions.checkNotNull(name, "Node identifier should not be null");
98         LOG.debug("Writing a new leaf node");
99         startNode(name.getNodeType(), NodeTypes.LEAF_NODE);
100
101         writeObject(value);
102     }
103
104     @Override
105     public void startLeafSet(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint) throws IOException, IllegalArgumentException {
106         Preconditions.checkNotNull(name, "Node identifier should not be null");
107         LOG.debug("Starting a new leaf set");
108
109         startNode(name.getNodeType(), NodeTypes.LEAF_SET);
110     }
111
112     @Override
113     public void leafSetEntryNode(Object value) throws IOException, IllegalArgumentException {
114         LOG.debug("Writing a new leaf set entry node");
115
116         output.writeByte(NodeTypes.LEAF_SET_ENTRY_NODE);
117         writeObject(value);
118     }
119
120     @Override
121     public void startContainerNode(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint) throws IOException, IllegalArgumentException {
122         Preconditions.checkNotNull(name, "Node identifier should not be null");
123
124         LOG.debug("Starting a new container node");
125
126         startNode(name.getNodeType(), NodeTypes.CONTAINER_NODE);
127     }
128
129     @Override
130     public void startUnkeyedList(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint) throws IOException, IllegalArgumentException {
131         Preconditions.checkNotNull(name, "Node identifier should not be null");
132         LOG.debug("Starting a new unkeyed list");
133
134         startNode(name.getNodeType(), NodeTypes.UNKEYED_LIST);
135     }
136
137     @Override
138     public void startUnkeyedListItem(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint) throws IOException, IllegalStateException {
139         Preconditions.checkNotNull(name, "Node identifier should not be null");
140         LOG.debug("Starting a new unkeyed list item");
141
142         startNode(name.getNodeType(), NodeTypes.UNKEYED_LIST_ITEM);
143     }
144
145     @Override
146     public void startMapNode(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint) throws IOException, IllegalArgumentException {
147         Preconditions.checkNotNull(name, "Node identifier should not be null");
148         LOG.debug("Starting a new map node");
149
150         startNode(name.getNodeType(), NodeTypes.MAP_NODE);
151     }
152
153     @Override
154     public void startMapEntryNode(YangInstanceIdentifier.NodeIdentifierWithPredicates identifier, int childSizeHint) throws IOException, IllegalArgumentException {
155         Preconditions.checkNotNull(identifier, "Node identifier should not be null");
156         LOG.debug("Starting a new map entry node");
157         startNode(identifier.getNodeType(), NodeTypes.MAP_ENTRY_NODE);
158
159         writeKeyValueMap(identifier.getKeyValues());
160
161     }
162
163     @Override
164     public void startOrderedMapNode(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint) throws IOException, IllegalArgumentException {
165         Preconditions.checkNotNull(name, "Node identifier should not be null");
166         LOG.debug("Starting a new ordered map node");
167
168         startNode(name.getNodeType(), NodeTypes.ORDERED_MAP_NODE);
169     }
170
171     @Override
172     public void startChoiceNode(YangInstanceIdentifier.NodeIdentifier name, int childSizeHint) throws IOException, IllegalArgumentException {
173         Preconditions.checkNotNull(name, "Node identifier should not be null");
174         LOG.debug("Starting a new choice node");
175
176         startNode(name.getNodeType(), NodeTypes.CHOICE_NODE);
177     }
178
179     @Override
180     public void startAugmentationNode(YangInstanceIdentifier.AugmentationIdentifier identifier) throws IOException, IllegalArgumentException {
181         Preconditions.checkNotNull(identifier, "Node identifier should not be null");
182         LOG.debug("Starting a new augmentation node");
183
184         output.writeByte(NodeTypes.AUGMENTATION_NODE);
185         writeQNameSet(identifier.getPossibleChildNames());
186     }
187
188     @Override
189     public void anyxmlNode(YangInstanceIdentifier.NodeIdentifier name, Object value) throws IOException, IllegalArgumentException {
190         Preconditions.checkNotNull(name, "Node identifier should not be null");
191         LOG.debug("Writing a new xml node");
192
193         startNode(name.getNodeType(), NodeTypes.ANY_XML_NODE);
194
195         writeObject(value);
196     }
197
198     @Override
199     public void endNode() throws IOException, IllegalStateException {
200         LOG.debug("Ending the node");
201
202         output.writeByte(NodeTypes.END_NODE);
203     }
204
205     @Override
206     public void close() throws IOException {
207         flush();
208     }
209
210     @Override
211     public void flush() throws IOException {
212         if (output instanceof OutputStream) {
213             ((OutputStream)output).flush();
214         }
215     }
216
217     private void startNode(final QName qName, byte nodeType) throws IOException {
218
219         Preconditions.checkNotNull(qName, "QName of node identifier should not be null.");
220
221         writeSignatureMarkerAndVersionIfNeeded();
222
223         // First write the type of node
224         output.writeByte(nodeType);
225         // Write Start Tag
226         writeQName(qName);
227     }
228
229     private void writeQName(QName qName) throws IOException {
230
231         writeCodedString(qName.getLocalName());
232         writeCodedString(qName.getNamespace().toString());
233         writeCodedString(qName.getFormattedRevision());
234     }
235
236     private void writeCodedString(String key) throws IOException {
237         Integer value = stringCodeMap.get(key);
238         if(value != null) {
239             output.writeByte(IS_CODE_VALUE);
240             output.writeInt(value);
241         } else {
242             if(key != null) {
243                 output.writeByte(IS_STRING_VALUE);
244                 stringCodeMap.put(key, Integer.valueOf(stringCodeMap.size()));
245                 output.writeUTF(key);
246             } else {
247                 output.writeByte(IS_NULL_VALUE);
248             }
249         }
250     }
251
252     private void writeObjSet(Set<?> set) throws IOException {
253         if(!set.isEmpty()){
254             output.writeInt(set.size());
255             for(Object o : set){
256                 if(o instanceof String){
257                     writeCodedString(o.toString());
258                 } else {
259                     throw new IllegalArgumentException("Expected value type to be String but was : " +
260                         o.toString());
261                 }
262             }
263         } else {
264             output.writeInt(0);
265         }
266     }
267
268     public void writeYangInstanceIdentifier(YangInstanceIdentifier identifier) throws IOException {
269         writeSignatureMarkerAndVersionIfNeeded();
270         writeYangInstanceIdentifierInternal(identifier);
271     }
272
273     private void writeYangInstanceIdentifierInternal(YangInstanceIdentifier identifier) throws IOException {
274         Collection<YangInstanceIdentifier.PathArgument> pathArguments = identifier.getPathArguments();
275         output.writeInt(pathArguments.size());
276
277         for(YangInstanceIdentifier.PathArgument pathArgument : pathArguments) {
278             writePathArgument(pathArgument);
279         }
280     }
281
282     public void writePathArgument(YangInstanceIdentifier.PathArgument pathArgument) throws IOException {
283
284         byte type = PathArgumentTypes.getSerializablePathArgumentType(pathArgument);
285
286         output.writeByte(type);
287
288         switch(type) {
289             case PathArgumentTypes.NODE_IDENTIFIER :
290
291                 YangInstanceIdentifier.NodeIdentifier nodeIdentifier =
292                     (YangInstanceIdentifier.NodeIdentifier) pathArgument;
293
294                 writeQName(nodeIdentifier.getNodeType());
295                 break;
296
297             case PathArgumentTypes.NODE_IDENTIFIER_WITH_PREDICATES:
298
299                 YangInstanceIdentifier.NodeIdentifierWithPredicates nodeIdentifierWithPredicates =
300                     (YangInstanceIdentifier.NodeIdentifierWithPredicates) pathArgument;
301                 writeQName(nodeIdentifierWithPredicates.getNodeType());
302
303                 writeKeyValueMap(nodeIdentifierWithPredicates.getKeyValues());
304                 break;
305
306             case PathArgumentTypes.NODE_IDENTIFIER_WITH_VALUE :
307
308                 YangInstanceIdentifier.NodeWithValue nodeWithValue =
309                     (YangInstanceIdentifier.NodeWithValue) pathArgument;
310
311                 writeQName(nodeWithValue.getNodeType());
312                 writeObject(nodeWithValue.getValue());
313                 break;
314
315             case PathArgumentTypes.AUGMENTATION_IDENTIFIER :
316
317                 YangInstanceIdentifier.AugmentationIdentifier augmentationIdentifier =
318                     (YangInstanceIdentifier.AugmentationIdentifier) pathArgument;
319
320                 // No Qname in augmentation identifier
321                 writeQNameSet(augmentationIdentifier.getPossibleChildNames());
322                 break;
323             default :
324                 throw new IllegalStateException("Unknown node identifier type is found : " + pathArgument.getClass().toString() );
325         }
326     }
327
328     private void writeKeyValueMap(Map<QName, Object> keyValueMap) throws IOException {
329         if(keyValueMap != null && !keyValueMap.isEmpty()) {
330             output.writeInt(keyValueMap.size());
331             Set<QName> qNameSet = keyValueMap.keySet();
332
333             for(QName qName : qNameSet) {
334                 writeQName(qName);
335                 writeObject(keyValueMap.get(qName));
336             }
337         } else {
338             output.writeInt(0);
339         }
340     }
341
342     private void writeQNameSet(Set<QName> children) throws IOException {
343         // Write each child's qname separately, if list is empty send count as 0
344         if(children != null && !children.isEmpty()) {
345             output.writeInt(children.size());
346             for(QName qName : children) {
347                 writeQName(qName);
348             }
349         } else {
350             LOG.debug("augmentation node does not have any child");
351             output.writeInt(0);
352         }
353     }
354
355     private void writeObject(Object value) throws IOException {
356
357         byte type = ValueTypes.getSerializableType(value);
358         // Write object type first
359         output.writeByte(type);
360
361         switch(type) {
362             case ValueTypes.BOOL_TYPE:
363                 output.writeBoolean((Boolean) value);
364                 break;
365             case ValueTypes.QNAME_TYPE:
366                 writeQName((QName) value);
367                 break;
368             case ValueTypes.INT_TYPE:
369                 output.writeInt((Integer) value);
370                 break;
371             case ValueTypes.BYTE_TYPE:
372                 output.writeByte((Byte) value);
373                 break;
374             case ValueTypes.LONG_TYPE:
375                 output.writeLong((Long) value);
376                 break;
377             case ValueTypes.SHORT_TYPE:
378                 output.writeShort((Short) value);
379                 break;
380             case ValueTypes.BITS_TYPE:
381                 writeObjSet((Set<?>) value);
382                 break;
383             case ValueTypes.BINARY_TYPE:
384                 byte[] bytes = (byte[]) value;
385                 output.writeInt(bytes.length);
386                 output.write(bytes);
387                 break;
388             case ValueTypes.YANG_IDENTIFIER_TYPE:
389                 writeYangInstanceIdentifierInternal((YangInstanceIdentifier) value);
390                 break;
391             case ValueTypes.NULL_TYPE :
392                 break;
393             case ValueTypes.STRING_BYTES_TYPE:
394                 final byte[] valueBytes = value.toString().getBytes(StandardCharsets.UTF_8);
395                 output.writeInt(valueBytes.length);
396                 output.write(valueBytes);
397                 break;
398             default:
399                 output.writeUTF(value.toString());
400                 break;
401         }
402     }
403 }