From 285424b294ad2253a2ad405cb2a687d0acb0c982 Mon Sep 17 00:00:00 2001 From: tpantelis Date: Mon, 2 Feb 2015 03:42:37 -0500 Subject: [PATCH] Add signature marker and version to NormalizedNode streaming For backwards compatibility, in some message classes, when de-serializing, we first try the new NormalizedNode streaming and if that fails we try the pre-Lithium protobuff de-serialization. However, we're relying on an IOException from the NormalizedNode streaming or it returning null which may not be reliable. In addition if protobuff fails as well we can't tell if it's a bad protobuff or streamed message and we squash the streaming exception. We don't want to masking streaming failures as that's what will be used going forward. To alleviate this, NormalizedNodeOutputStreamReader now writes an initial byte as a signature marker to identify it as a valid stream. NormalizedNodeInputStreamReader reads it, and if invalid, throws an exception. This is caught by SerializationUtils as an indication to try the legacy protobuff de-serialization. Also, I added a version number to the output stream so we can handle future changes for backwards compatibility. Change-Id: I19779aa83c33eadc6bf83adfe6aae0b9a27cdfae Signed-off-by: tpantelis --- .../InvalidNormalizedNodeStreamException.java | 24 +++++ .../NormalizedNodeInputStreamReader.java | 36 ++++++- .../NormalizedNodeOutputStreamWriter.java | 24 ++++- .../NormalizedNodeStreamReaderWriterTest.java | 100 +++++++++++++++--- .../datastore/utils/SerializationUtils.java | 36 ++++--- 5 files changed, 182 insertions(+), 38 deletions(-) create mode 100644 opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/InvalidNormalizedNodeStreamException.java diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/InvalidNormalizedNodeStreamException.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/InvalidNormalizedNodeStreamException.java new file mode 100644 index 0000000000..da60496a22 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/InvalidNormalizedNodeStreamException.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.node.utils.stream; + +import java.io.IOException; + +/** + * Exception thrown from NormalizedNodeInputStreamReader when the input stream does not contain + * valid serialized data. + * + * @author Thomas Pantelis + */ +public class InvalidNormalizedNodeStreamException extends IOException { + private static final long serialVersionUID = 1L; + + public InvalidNormalizedNodeStreamException(String message) { + super(message); + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeInputStreamReader.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeInputStreamReader.java index cde338179b..bb2f5d41d9 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeInputStreamReader.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeInputStreamReader.java @@ -69,6 +69,8 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead private final StringBuilder reusableStringBuilder = new StringBuilder(50); + private boolean readSignatureMarker = true; + public NormalizedNodeInputStreamReader(InputStream stream) throws IOException { Preconditions.checkNotNull(stream); input = new DataInputStream(stream); @@ -80,6 +82,25 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead @Override public NormalizedNode readNormalizedNode() throws IOException { + readSignatureMarkerAndVersionIfNeeded(); + return readNormalizedNodeInternal(); + } + + private void readSignatureMarkerAndVersionIfNeeded() throws IOException { + if(readSignatureMarker) { + readSignatureMarker = false; + + byte marker = input.readByte(); + if(marker != NormalizedNodeOutputStreamWriter.SIGNATURE_MARKER) { + throw new InvalidNormalizedNodeStreamException(String.format( + "Invalid signature marker: %d", marker)); + } + + input.readShort(); // read the version - not currently used/needed. + } + } + + private NormalizedNode readNormalizedNodeInternal() throws IOException { // each node should start with a byte byte nodeType = input.readByte(); @@ -284,7 +305,7 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead return bytes; case ValueTypes.YANG_IDENTIFIER_TYPE : - return readYangInstanceIdentifier(); + return readYangInstanceIdentifierInternal(); default : return null; @@ -292,6 +313,11 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead } public YangInstanceIdentifier readYangInstanceIdentifier() throws IOException { + readSignatureMarkerAndVersionIfNeeded(); + return readYangInstanceIdentifierInternal(); + } + + private YangInstanceIdentifier readYangInstanceIdentifierInternal() throws IOException { int size = input.readInt(); List pathArguments = new ArrayList<>(size); @@ -342,11 +368,11 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead lastLeafSetQName = nodeType; - LeafSetEntryNode child = (LeafSetEntryNode)readNormalizedNode(); + LeafSetEntryNode child = (LeafSetEntryNode)readNormalizedNodeInternal(); while(child != null) { builder.withChild(child); - child = (LeafSetEntryNode)readNormalizedNode(); + child = (LeafSetEntryNode)readNormalizedNodeInternal(); } return builder; } @@ -356,11 +382,11 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead NormalizedNodeContainerBuilder builder) throws IOException { LOG.debug("Reading data container (leaf nodes) nodes"); - NormalizedNode child = readNormalizedNode(); + NormalizedNode child = readNormalizedNodeInternal(); while(child != null) { builder.addChild(child); - child = readNormalizedNode(); + child = readNormalizedNodeInternal(); } return builder; } diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeOutputStreamWriter.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeOutputStreamWriter.java index 088f4dfbe9..d4aab036be 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeOutputStreamWriter.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeOutputStreamWriter.java @@ -46,6 +46,9 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri private static final Logger LOG = LoggerFactory.getLogger(NormalizedNodeOutputStreamWriter.class); + static final byte SIGNATURE_MARKER = (byte) 0xab; + static final short CURRENT_VERSION = (short) 1; + static final byte IS_CODE_VALUE = 1; static final byte IS_STRING_VALUE = 2; static final byte IS_NULL_VALUE = 3; @@ -56,6 +59,8 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri private NormalizedNodeWriter normalizedNodeWriter; + private boolean wroteSignatureMarker; + public NormalizedNodeOutputStreamWriter(OutputStream stream) throws IOException { Preconditions.checkNotNull(stream); output = new DataOutputStream(stream); @@ -74,9 +79,18 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri } public void writeNormalizedNode(NormalizedNode node) throws IOException { + writeSignatureMarkerAndVersionIfNeeded(); normalizedNodeWriter().write(node); } + private void writeSignatureMarkerAndVersionIfNeeded() throws IOException { + if(!wroteSignatureMarker) { + output.writeByte(SIGNATURE_MARKER); + output.writeShort(CURRENT_VERSION); + wroteSignatureMarker = true; + } + } + @Override public void leafNode(YangInstanceIdentifier.NodeIdentifier name, Object value) throws IOException, IllegalArgumentException { Preconditions.checkNotNull(name, "Node identifier should not be null"); @@ -201,6 +215,9 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri private void startNode(final QName qName, byte nodeType) throws IOException { Preconditions.checkNotNull(qName, "QName of node identifier should not be null."); + + writeSignatureMarkerAndVersionIfNeeded(); + // First write the type of node output.writeByte(nodeType); // Write Start Tag @@ -247,6 +264,11 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri } public void writeYangInstanceIdentifier(YangInstanceIdentifier identifier) throws IOException { + writeSignatureMarkerAndVersionIfNeeded(); + writeYangInstanceIdentifierInternal(identifier); + } + + private void writeYangInstanceIdentifierInternal(YangInstanceIdentifier identifier) throws IOException { Iterable pathArguments = identifier.getPathArguments(); int size = Iterables.size(pathArguments); output.writeInt(size); @@ -363,7 +385,7 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri output.write(bytes); break; case ValueTypes.YANG_IDENTIFIER_TYPE: - writeYangInstanceIdentifier((YangInstanceIdentifier) value); + writeYangInstanceIdentifierInternal((YangInstanceIdentifier) value); break; case ValueTypes.NULL_TYPE : break; diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeStreamReaderWriterTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeStreamReaderWriterTest.java index 6528f2e4d2..67a342b440 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeStreamReaderWriterTest.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeStreamReaderWriterTest.java @@ -15,15 +15,16 @@ import java.io.IOException; import org.apache.commons.lang.SerializationUtils; import org.junit.Assert; import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; +import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils; import org.opendaylight.controller.cluster.datastore.util.TestModel; +import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter; -import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter; import org.opendaylight.yangtools.yang.data.impl.schema.Builders; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafSetEntryNodeBuilder; @@ -33,9 +34,13 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; public class NormalizedNodeStreamReaderWriterTest { @Test - public void testNormalizedNodeStreamReaderWriter() throws IOException { + public void testNormalizedNodeStreaming() throws IOException { - testNormalizedNodeStreamReaderWriter(createTestContainer()); + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + NormalizedNodeOutputStreamWriter writer = new NormalizedNodeOutputStreamWriter(byteArrayOutputStream); + + NormalizedNode testContainer = createTestContainer(); + writer.writeNormalizedNode(testContainer); QName toaster = QName.create("http://netconfcentral.org/ns/toaster","2009-11-20","toaster"); QName darknessFactor = QName.create("http://netconfcentral.org/ns/toaster","2009-11-20","darknessFactor"); @@ -43,9 +48,21 @@ public class NormalizedNodeStreamReaderWriterTest { withNodeIdentifier(new NodeIdentifier(toaster)). withChild(ImmutableNodes.leafNode(darknessFactor, "1000")).build(); - testNormalizedNodeStreamReaderWriter(Builders.containerBuilder(). + ContainerNode toasterContainer = Builders.containerBuilder(). withNodeIdentifier(new NodeIdentifier(SchemaContext.NAME)). - withChild(toasterNode).build()); + withChild(toasterNode).build(); + writer.writeNormalizedNode(toasterContainer); + + NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader( + new ByteArrayInputStream(byteArrayOutputStream.toByteArray())); + + NormalizedNode node = reader.readNormalizedNode(); + Assert.assertEquals(testContainer, node); + + node = reader.readNormalizedNode(); + Assert.assertEquals(toasterContainer, node); + + writer.close(); } private NormalizedNode createTestContainer() { @@ -76,24 +93,75 @@ public class NormalizedNodeStreamReaderWriterTest { build(); } - private void testNormalizedNodeStreamReaderWriter(NormalizedNode input) throws IOException { + @Test + public void testYangInstanceIdentifierStreaming() throws IOException { + YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.TEST_PATH). + node(TestModel.OUTER_LIST_QNAME).nodeWithKey( + TestModel.INNER_LIST_QNAME, TestModel.ID_QNAME, 10).build(); - byte[] byteData = null; + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + NormalizedNodeOutputStreamWriter writer = + new NormalizedNodeOutputStreamWriter(byteArrayOutputStream); + writer.writeYangInstanceIdentifier(path); + + NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader( + new ByteArrayInputStream(byteArrayOutputStream.toByteArray())); + + YangInstanceIdentifier newPath = reader.readYangInstanceIdentifier(); + Assert.assertEquals(path, newPath); + + writer.close(); + } + + @Test + public void testNormalizedNodeAndYangInstanceIdentifierStreaming() throws IOException { - try(ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - NormalizedNodeStreamWriter writer = new NormalizedNodeOutputStreamWriter(byteArrayOutputStream)) { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + NormalizedNodeOutputStreamWriter writer = new NormalizedNodeOutputStreamWriter(byteArrayOutputStream); - NormalizedNodeWriter normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(writer); - normalizedNodeWriter.write(input); - byteData = byteArrayOutputStream.toByteArray(); + NormalizedNode testContainer = TestModel.createBaseTestContainerBuilder().build(); + writer.writeNormalizedNode(testContainer); - } + YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.TEST_PATH). + node(TestModel.OUTER_LIST_QNAME).nodeWithKey( + TestModel.INNER_LIST_QNAME, TestModel.ID_QNAME, 10).build(); + + writer.writeYangInstanceIdentifier(path); NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader( - new ByteArrayInputStream(byteData)); + new ByteArrayInputStream(byteArrayOutputStream.toByteArray())); NormalizedNode node = reader.readNormalizedNode(); - Assert.assertEquals(input, node); + Assert.assertEquals(testContainer, node); + + YangInstanceIdentifier newPath = reader.readYangInstanceIdentifier(); + Assert.assertEquals(path, newPath); + + writer.close(); + } + + @Test(expected=InvalidNormalizedNodeStreamException.class, timeout=10000) + public void testInvalidNormalizedNodeStream() throws IOException { + byte[] protobufBytes = new NormalizedNodeToNodeCodec(null).encode( + TestModel.createBaseTestContainerBuilder().build()).getNormalizedNode().toByteArray(); + + NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader( + new ByteArrayInputStream(protobufBytes)); + + reader.readNormalizedNode(); + } + + @Test(expected=InvalidNormalizedNodeStreamException.class, timeout=10000) + public void testInvalidYangInstanceIdentifierStream() throws IOException { + YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.TEST_PATH).build(); + + byte[] protobufBytes = ShardTransactionMessages.DeleteData.newBuilder().setInstanceIdentifierPathArguments( + InstanceIdentifierUtils.toSerializable(path)).build().toByteArray(); + + NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader( + new ByteArrayInputStream(protobufBytes)); + + reader.readYangInstanceIdentifier(); } @Test diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/SerializationUtils.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/SerializationUtils.java index 5854932a6f..bf9f8d803a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/SerializationUtils.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/SerializationUtils.java @@ -17,6 +17,7 @@ import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.InvalidNormalizedNodeStreamException; import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputStreamReader; import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeOutputStreamWriter; import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; @@ -93,15 +94,19 @@ public final class SerializationUtils { } public static NormalizedNode deserializeNormalizedNode(DataInput in) { - try { - boolean present = in.readBoolean(); - if(present) { - NormalizedNodeInputStreamReader streamReader = streamReader(in); - return streamReader.readNormalizedNode(); - } - } catch (IOException e) { - throw new IllegalArgumentException("Error deserializing NormalizedNode", e); - } + try { + return tryDeserializeNormalizedNode(in); + } catch (IOException e) { + throw new IllegalArgumentException("Error deserializing NormalizedNode", e); + } + } + + private static NormalizedNode tryDeserializeNormalizedNode(DataInput in) throws IOException { + boolean present = in.readBoolean(); + if(present) { + NormalizedNodeInputStreamReader streamReader = streamReader(in); + return streamReader.readNormalizedNode(); + } return null; } @@ -109,18 +114,17 @@ public final class SerializationUtils { public static NormalizedNode deserializeNormalizedNode(byte [] bytes) { NormalizedNode node = null; try { - node = deserializeNormalizedNode(new DataInputStream(new ByteArrayInputStream(bytes))); - } catch(Exception e) { - } - - if(node == null) { - // Must be from legacy protobuf serialization - try that. + node = tryDeserializeNormalizedNode(new DataInputStream(new ByteArrayInputStream(bytes))); + } catch(InvalidNormalizedNodeStreamException e) { + // Probably from legacy protobuf serialization - try that. try { NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(bytes); node = new NormalizedNodeToNodeCodec(null).decode(serializedNode); - } catch (InvalidProtocolBufferException e) { + } catch (InvalidProtocolBufferException e2) { throw new IllegalArgumentException("Error deserializing NormalizedNode", e); } + } catch (IOException e) { + throw new IllegalArgumentException("Error deserializing NormalizedNode", e); } return node; -- 2.36.6