Add signature marker and version to NormalizedNode streaming 62/14862/2
authortpantelis <tpanteli@brocade.com>
Mon, 2 Feb 2015 08:42:37 +0000 (03:42 -0500)
committertpantelis <tpanteli@brocade.com>
Mon, 2 Feb 2015 21:32:39 +0000 (16:32 -0500)
For backwards compatibility, in some message classes, when
de-serializing, we first try the new NormalizedNode streaming and if
that fails we try the pre-Lithium protobuff de-serialization. However,
we're relying on an IOException from the NormalizedNode streaming or it
returning null which may not be reliable. In addition if protobuff fails
as well we can't tell if it's a bad protobuff or streamed message and we
squash the streaming exception. We don't want to masking streaming failures
as that's what will be used going forward.

To alleviate this, NormalizedNodeOutputStreamReader now writes an initial byte
as a signature marker to identify it as a valid stream.
NormalizedNodeInputStreamReader reads it, and if invalid, throws an
exception. This is caught by SerializationUtils as an indication to try
the legacy protobuff de-serialization.

Also, I added a version number to the output stream so we can handle
future changes for backwards compatibility.

Change-Id: I19779aa83c33eadc6bf83adfe6aae0b9a27cdfae
Signed-off-by: tpantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/InvalidNormalizedNodeStreamException.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeInputStreamReader.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeOutputStreamWriter.java
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeStreamReaderWriterTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/SerializationUtils.java

diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/InvalidNormalizedNodeStreamException.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/InvalidNormalizedNodeStreamException.java
new file mode 100644 (file)
index 0000000..da60496
--- /dev/null
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.node.utils.stream;
+
+import java.io.IOException;
+
+/**
+ * Exception thrown from NormalizedNodeInputStreamReader when the input stream does not contain
+ * valid serialized data.
+ *
+ * @author Thomas Pantelis
+ */
+public class InvalidNormalizedNodeStreamException extends IOException {
+    private static final long serialVersionUID = 1L;
+
+    public InvalidNormalizedNodeStreamException(String message) {
+        super(message);
+    }
+}
index cde338179ba727903e35d4513195cbbddd9b1b1c..bb2f5d41d920d5ce97c723fe6bc91cafc5cd6031 100644 (file)
@@ -69,6 +69,8 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead
 
     private final StringBuilder reusableStringBuilder = new StringBuilder(50);
 
+    private boolean readSignatureMarker = true;
+
     public NormalizedNodeInputStreamReader(InputStream stream) throws IOException {
         Preconditions.checkNotNull(stream);
         input = new DataInputStream(stream);
@@ -80,6 +82,25 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead
 
     @Override
     public NormalizedNode<?, ?> readNormalizedNode() throws IOException {
+        readSignatureMarkerAndVersionIfNeeded();
+        return readNormalizedNodeInternal();
+    }
+
+    private void readSignatureMarkerAndVersionIfNeeded() throws IOException {
+        if(readSignatureMarker) {
+            readSignatureMarker = false;
+
+            byte marker = input.readByte();
+            if(marker != NormalizedNodeOutputStreamWriter.SIGNATURE_MARKER) {
+                throw new InvalidNormalizedNodeStreamException(String.format(
+                        "Invalid signature marker: %d", marker));
+            }
+
+            input.readShort(); // read the version - not currently used/needed.
+        }
+    }
+
+    private NormalizedNode<?, ?> readNormalizedNodeInternal() throws IOException {
         // each node should start with a byte
         byte nodeType = input.readByte();
 
@@ -284,7 +305,7 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead
                 return bytes;
 
             case ValueTypes.YANG_IDENTIFIER_TYPE :
-                return readYangInstanceIdentifier();
+                return readYangInstanceIdentifierInternal();
 
             default :
                 return null;
@@ -292,6 +313,11 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead
     }
 
     public YangInstanceIdentifier readYangInstanceIdentifier() throws IOException {
+        readSignatureMarkerAndVersionIfNeeded();
+        return readYangInstanceIdentifierInternal();
+    }
+
+    private YangInstanceIdentifier readYangInstanceIdentifierInternal() throws IOException {
         int size = input.readInt();
 
         List<PathArgument> pathArguments = new ArrayList<>(size);
@@ -342,11 +368,11 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead
 
         lastLeafSetQName = nodeType;
 
-        LeafSetEntryNode<Object> child = (LeafSetEntryNode<Object>)readNormalizedNode();
+        LeafSetEntryNode<Object> child = (LeafSetEntryNode<Object>)readNormalizedNodeInternal();
 
         while(child != null) {
             builder.withChild(child);
-            child = (LeafSetEntryNode<Object>)readNormalizedNode();
+            child = (LeafSetEntryNode<Object>)readNormalizedNodeInternal();
         }
         return builder;
     }
@@ -356,11 +382,11 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead
             NormalizedNodeContainerBuilder builder) throws IOException {
         LOG.debug("Reading data container (leaf nodes) nodes");
 
-        NormalizedNode<?, ?> child = readNormalizedNode();
+        NormalizedNode<?, ?> child = readNormalizedNodeInternal();
 
         while(child != null) {
             builder.addChild(child);
-            child = readNormalizedNode();
+            child = readNormalizedNodeInternal();
         }
         return builder;
     }
index 088f4dfbe98a1358a980a84e8ad3adaa99736b57..d4aab036be21df1734f4f88bc590b35030a71d21 100644 (file)
@@ -46,6 +46,9 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri
 
     private static final Logger LOG = LoggerFactory.getLogger(NormalizedNodeOutputStreamWriter.class);
 
+    static final byte SIGNATURE_MARKER = (byte) 0xab;
+    static final short CURRENT_VERSION = (short) 1;
+
     static final byte IS_CODE_VALUE = 1;
     static final byte IS_STRING_VALUE = 2;
     static final byte IS_NULL_VALUE = 3;
@@ -56,6 +59,8 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri
 
     private NormalizedNodeWriter normalizedNodeWriter;
 
+    private boolean wroteSignatureMarker;
+
     public NormalizedNodeOutputStreamWriter(OutputStream stream) throws IOException {
         Preconditions.checkNotNull(stream);
         output = new DataOutputStream(stream);
@@ -74,9 +79,18 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri
     }
 
     public void writeNormalizedNode(NormalizedNode<?, ?> node) throws IOException {
+        writeSignatureMarkerAndVersionIfNeeded();
         normalizedNodeWriter().write(node);
     }
 
+    private void writeSignatureMarkerAndVersionIfNeeded() throws IOException {
+        if(!wroteSignatureMarker) {
+            output.writeByte(SIGNATURE_MARKER);
+            output.writeShort(CURRENT_VERSION);
+            wroteSignatureMarker = true;
+        }
+    }
+
     @Override
     public void leafNode(YangInstanceIdentifier.NodeIdentifier name, Object value) throws IOException, IllegalArgumentException {
         Preconditions.checkNotNull(name, "Node identifier should not be null");
@@ -201,6 +215,9 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri
     private void startNode(final QName qName, byte nodeType) throws IOException {
 
         Preconditions.checkNotNull(qName, "QName of node identifier should not be null.");
+
+        writeSignatureMarkerAndVersionIfNeeded();
+
         // First write the type of node
         output.writeByte(nodeType);
         // Write Start Tag
@@ -247,6 +264,11 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri
     }
 
     public void writeYangInstanceIdentifier(YangInstanceIdentifier identifier) throws IOException {
+        writeSignatureMarkerAndVersionIfNeeded();
+        writeYangInstanceIdentifierInternal(identifier);
+    }
+
+    private void writeYangInstanceIdentifierInternal(YangInstanceIdentifier identifier) throws IOException {
         Iterable<YangInstanceIdentifier.PathArgument> pathArguments = identifier.getPathArguments();
         int size = Iterables.size(pathArguments);
         output.writeInt(size);
@@ -363,7 +385,7 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri
                 output.write(bytes);
                 break;
             case ValueTypes.YANG_IDENTIFIER_TYPE:
-                writeYangInstanceIdentifier((YangInstanceIdentifier) value);
+                writeYangInstanceIdentifierInternal((YangInstanceIdentifier) value);
                 break;
             case ValueTypes.NULL_TYPE :
                 break;
index 6528f2e4d2e6524f2f08acb750aae764c86dbc0b..67a342b440666e3febfd6e5b390953fa3d54bebf 100644 (file)
@@ -15,15 +15,16 @@ import java.io.IOException;
 import org.apache.commons.lang.SerializationUtils;
 import org.junit.Assert;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
 import org.opendaylight.controller.cluster.datastore.util.TestModel;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
-import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafSetEntryNodeBuilder;
@@ -33,9 +34,13 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 public class NormalizedNodeStreamReaderWriterTest {
 
     @Test
-    public void testNormalizedNodeStreamReaderWriter() throws IOException {
+    public void testNormalizedNodeStreaming() throws IOException {
 
-        testNormalizedNodeStreamReaderWriter(createTestContainer());
+        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        NormalizedNodeOutputStreamWriter writer = new NormalizedNodeOutputStreamWriter(byteArrayOutputStream);
+
+        NormalizedNode<?, ?> testContainer = createTestContainer();
+        writer.writeNormalizedNode(testContainer);
 
         QName toaster = QName.create("http://netconfcentral.org/ns/toaster","2009-11-20","toaster");
         QName darknessFactor = QName.create("http://netconfcentral.org/ns/toaster","2009-11-20","darknessFactor");
@@ -43,9 +48,21 @@ public class NormalizedNodeStreamReaderWriterTest {
                 withNodeIdentifier(new NodeIdentifier(toaster)).
                 withChild(ImmutableNodes.leafNode(darknessFactor, "1000")).build();
 
-        testNormalizedNodeStreamReaderWriter(Builders.containerBuilder().
+        ContainerNode toasterContainer = Builders.containerBuilder().
                 withNodeIdentifier(new NodeIdentifier(SchemaContext.NAME)).
-                withChild(toasterNode).build());
+                withChild(toasterNode).build();
+        writer.writeNormalizedNode(toasterContainer);
+
+        NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(
+                new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
+
+        NormalizedNode<?,?> node = reader.readNormalizedNode();
+        Assert.assertEquals(testContainer, node);
+
+        node = reader.readNormalizedNode();
+        Assert.assertEquals(toasterContainer, node);
+
+        writer.close();
     }
 
     private NormalizedNode<?, ?> createTestContainer() {
@@ -76,24 +93,75 @@ public class NormalizedNodeStreamReaderWriterTest {
                 build();
     }
 
-    private void testNormalizedNodeStreamReaderWriter(NormalizedNode<?, ?> input) throws IOException {
+    @Test
+    public void testYangInstanceIdentifierStreaming() throws IOException  {
+        YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.TEST_PATH).
+                node(TestModel.OUTER_LIST_QNAME).nodeWithKey(
+                        TestModel.INNER_LIST_QNAME, TestModel.ID_QNAME, 10).build();
 
-        byte[] byteData = null;
+        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        NormalizedNodeOutputStreamWriter writer =
+                new NormalizedNodeOutputStreamWriter(byteArrayOutputStream);
+        writer.writeYangInstanceIdentifier(path);
+
+        NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(
+                new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
+
+        YangInstanceIdentifier newPath = reader.readYangInstanceIdentifier();
+        Assert.assertEquals(path, newPath);
+
+        writer.close();
+    }
+
+    @Test
+    public void testNormalizedNodeAndYangInstanceIdentifierStreaming() throws IOException {
 
-        try(ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
-            NormalizedNodeStreamWriter writer = new NormalizedNodeOutputStreamWriter(byteArrayOutputStream)) {
+        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        NormalizedNodeOutputStreamWriter writer = new NormalizedNodeOutputStreamWriter(byteArrayOutputStream);
 
-            NormalizedNodeWriter normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(writer);
-            normalizedNodeWriter.write(input);
-            byteData = byteArrayOutputStream.toByteArray();
+        NormalizedNode<?, ?> testContainer = TestModel.createBaseTestContainerBuilder().build();
+        writer.writeNormalizedNode(testContainer);
 
-        }
+        YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.TEST_PATH).
+                node(TestModel.OUTER_LIST_QNAME).nodeWithKey(
+                        TestModel.INNER_LIST_QNAME, TestModel.ID_QNAME, 10).build();
+
+        writer.writeYangInstanceIdentifier(path);
 
         NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(
-                new ByteArrayInputStream(byteData));
+                new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
 
         NormalizedNode<?,?> node = reader.readNormalizedNode();
-        Assert.assertEquals(input, node);
+        Assert.assertEquals(testContainer, node);
+
+        YangInstanceIdentifier newPath = reader.readYangInstanceIdentifier();
+        Assert.assertEquals(path, newPath);
+
+        writer.close();
+    }
+
+    @Test(expected=InvalidNormalizedNodeStreamException.class, timeout=10000)
+    public void testInvalidNormalizedNodeStream() throws IOException {
+        byte[] protobufBytes = new NormalizedNodeToNodeCodec(null).encode(
+                TestModel.createBaseTestContainerBuilder().build()).getNormalizedNode().toByteArray();
+
+        NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(
+                new ByteArrayInputStream(protobufBytes));
+
+        reader.readNormalizedNode();
+    }
+
+    @Test(expected=InvalidNormalizedNodeStreamException.class, timeout=10000)
+    public void testInvalidYangInstanceIdentifierStream() throws IOException {
+        YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.TEST_PATH).build();
+
+        byte[] protobufBytes = ShardTransactionMessages.DeleteData.newBuilder().setInstanceIdentifierPathArguments(
+                InstanceIdentifierUtils.toSerializable(path)).build().toByteArray();
+
+        NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(
+                new ByteArrayInputStream(protobufBytes));
+
+        reader.readYangInstanceIdentifier();
     }
 
     @Test
index 5854932a6fa0d999fe368aa61bfdd252821739c6..bf9f8d803ac00f0e8ce2cbe983f81bfb9e9a44d0 100644 (file)
@@ -17,6 +17,7 @@ import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.InvalidNormalizedNodeStreamException;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputStreamReader;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeOutputStreamWriter;
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
@@ -93,15 +94,19 @@ public final class SerializationUtils {
     }
 
     public static NormalizedNode<?, ?> deserializeNormalizedNode(DataInput in) {
-            try {
-                boolean present = in.readBoolean();
-                if(present) {
-                    NormalizedNodeInputStreamReader streamReader = streamReader(in);
-                    return streamReader.readNormalizedNode();
-                }
-            } catch (IOException e) {
-                throw new IllegalArgumentException("Error deserializing NormalizedNode", e);
-            }
+        try {
+            return tryDeserializeNormalizedNode(in);
+        } catch (IOException e) {
+            throw new IllegalArgumentException("Error deserializing NormalizedNode", e);
+        }
+    }
+
+    private static NormalizedNode<?, ?> tryDeserializeNormalizedNode(DataInput in) throws IOException {
+        boolean present = in.readBoolean();
+        if(present) {
+            NormalizedNodeInputStreamReader streamReader = streamReader(in);
+            return streamReader.readNormalizedNode();
+        }
 
         return null;
     }
@@ -109,18 +114,17 @@ public final class SerializationUtils {
     public static NormalizedNode<?, ?> deserializeNormalizedNode(byte [] bytes) {
         NormalizedNode<?, ?> node = null;
         try {
-            node = deserializeNormalizedNode(new DataInputStream(new ByteArrayInputStream(bytes)));
-        } catch(Exception e) {
-        }
-
-        if(node == null) {
-            // Must be from legacy protobuf serialization - try that.
+            node = tryDeserializeNormalizedNode(new DataInputStream(new ByteArrayInputStream(bytes)));
+        } catch(InvalidNormalizedNodeStreamException e) {
+            // Probably from legacy protobuf serialization - try that.
             try {
                 NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(bytes);
                 node =  new NormalizedNodeToNodeCodec(null).decode(serializedNode);
-            } catch (InvalidProtocolBufferException e) {
+            } catch (InvalidProtocolBufferException e2) {
                 throw new IllegalArgumentException("Error deserializing NormalizedNode", e);
             }
+        } catch (IOException e) {
+            throw new IllegalArgumentException("Error deserializing NormalizedNode", e);
         }
 
         return node;