Add support for coded QNames/AugmentationIdentifiers 04/82304/7
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 29 May 2019 09:27:21 +0000 (11:27 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Wed, 29 May 2019 18:05:08 +0000 (20:05 +0200)
Profiling is showing we are spending about 46% in readQName(),
which further breaks down to 30% in readCodedString() and 61%
in QNameFactory.create().

Out of 1.7M invocations of readQName(), we end up loading only
508 QNames -- which is understandable, as the number of QNames is
bound by the SchemaContext contents. This also applies to
AugmentationIdentifiers.

This patch defines a new streaming format version, which uses
the same linear coding approach we take for Strings, taking
hash-based lookups out of the hot path -- bringing reconstruction
time down from 36 to 19-24 seconds.

It has also the effect of reducing the snapshot size, in our testing
bringing a 388.8MiB snapshot down to 210.6MiB.

JIRA: CONTROLLER-1898
Change-Id: Ibbc96c286d52b3261b04dd7de02e13bc6b44e803
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
12 files changed:
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/AbstractNormalizedNodeDataOutput.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/ForwardingNormalizedNodeDataInput.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/LithiumNormalizedNodeInputStreamReader.java [moved from opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeInputStreamReader.java with 97% similarity]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/LithiumNormalizedNodeOutputStreamWriter.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeDataInput.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeInputOutput.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeOutputStreamWriter.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeStreamVersion.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/SodiumNormalizedNodeInputStreamReader.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/SodiumNormalizedNodeOutputStreamWriter.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/TokenTypes.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/VersionedNormalizedNodeDataInput.java

index cddcd8d..002781a 100755 (executable)
@@ -276,7 +276,7 @@ abstract class AbstractNormalizedNodeDataOutput implements NormalizedNodeDataOut
         LOG.trace("Starting a new augmentation node");
 
         output.writeByte(NodeTypes.AUGMENTATION_NODE);
-        writeQNameSet(identifier.getPossibleChildNames());
+        writeAugmentationIdentifier(identifier);
     }
 
     @Override
@@ -410,10 +410,8 @@ abstract class AbstractNormalizedNodeDataOutput implements NormalizedNodeDataOut
 
             case PathArgumentTypes.AUGMENTATION_IDENTIFIER :
 
-                AugmentationIdentifier augmentationIdentifier = (AugmentationIdentifier) pathArgument;
-
                 // No Qname in augmentation identifier
-                writeQNameSet(augmentationIdentifier.getPossibleChildNames());
+                writeAugmentationIdentifier((AugmentationIdentifier) pathArgument);
                 break;
             default :
                 throw new IllegalStateException("Unknown node identifier type is found : "
@@ -434,11 +432,12 @@ abstract class AbstractNormalizedNodeDataOutput implements NormalizedNodeDataOut
         }
     }
 
-    private void writeQNameSet(final Set<QName> children) throws IOException {
+    void writeAugmentationIdentifier(final AugmentationIdentifier aid) throws IOException {
+        final Set<QName> qnames = aid.getPossibleChildNames();
         // Write each child's qname separately, if list is empty send count as 0
-        if (children != null && !children.isEmpty()) {
-            output.writeInt(children.size());
-            for (QName qname : children) {
+        if (!qnames.isEmpty()) {
+            output.writeInt(qnames.size());
+            for (QName qname : qnames) {
                 writeQName(qname);
             }
         } else {
index 590a8b0..97beda3 100644 (file)
@@ -38,4 +38,9 @@ abstract class ForwardingNormalizedNodeDataInput extends ForwardingDataInput imp
     public final SchemaPath readSchemaPath() throws IOException {
         return delegate().readSchemaPath();
     }
+
+    @Override
+    public final NormalizedNodeStreamVersion getVersion() throws IOException {
+        return delegate().getVersion();
+    }
 }
@@ -55,9 +55,9 @@ import org.xml.sax.SAXException;
  * nodes. This process goes in recursive manner, where each NodeTypes object signifies the start of the object, except
  * END_NODE. If a node can have children, then that node's end is calculated based on appearance of END_NODE.
  */
-public class NormalizedNodeInputStreamReader extends ForwardingDataInput implements NormalizedNodeDataInput {
+class LithiumNormalizedNodeInputStreamReader extends ForwardingDataInput implements NormalizedNodeDataInput {
 
-    private static final Logger LOG = LoggerFactory.getLogger(NormalizedNodeInputStreamReader.class);
+    private static final Logger LOG = LoggerFactory.getLogger(LithiumNormalizedNodeInputStreamReader.class);
 
     private final @NonNull DataInput input;
 
@@ -70,7 +70,7 @@ public class NormalizedNodeInputStreamReader extends ForwardingDataInput impleme
     @SuppressWarnings("rawtypes")
     private NormalizedNodeBuilder<NodeWithValue, Object, LeafSetEntryNode<Object>> leafSetEntryBuilder;
 
-    NormalizedNodeInputStreamReader(final DataInput input) {
+    LithiumNormalizedNodeInputStreamReader(final DataInput input) {
         this.input = requireNonNull(input);
     }
 
@@ -79,6 +79,11 @@ public class NormalizedNodeInputStreamReader extends ForwardingDataInput impleme
         return input;
     }
 
+    @Override
+    public NormalizedNodeStreamVersion getVersion() throws IOException {
+        return NormalizedNodeStreamVersion.LITHIUM;
+    }
+
     @Override
     public NormalizedNode<?, ?> readNormalizedNode() throws IOException {
         return readNormalizedNodeInternal();
@@ -202,7 +207,7 @@ public class NormalizedNodeInputStreamReader extends ForwardingDataInput impleme
         }
     }
 
-    private QName readQName() throws IOException {
+    QName readQName() throws IOException {
         // Read in the same sequence of writing
         String localName = readCodedString();
         String namespace = readCodedString();
@@ -243,8 +248,7 @@ public class NormalizedNodeInputStreamReader extends ForwardingDataInput impleme
         return children;
     }
 
-    private AugmentationIdentifier readAugmentationIdentifier() throws IOException {
-        // FIXME: we should have a cache for these, too
+    AugmentationIdentifier readAugmentationIdentifier() throws IOException {
         return new AugmentationIdentifier(readQNameSet());
     }
 
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/LithiumNormalizedNodeOutputStreamWriter.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/LithiumNormalizedNodeOutputStreamWriter.java
new file mode 100644 (file)
index 0000000..e777948
--- /dev/null
@@ -0,0 +1,66 @@
+/*
+ * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.node.utils.stream;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.Revision;
+
+/**
+ * NormalizedNodeOutputStreamWriter will be used by distributed datastore to send normalized node in
+ * a stream.
+ * A stream writer wrapper around this class will write node objects to stream in recursive manner.
+ * for example - If you have a ContainerNode which has a two LeafNode as children, then
+ * you will first call
+ * {@link #startContainerNode(org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier, int)},
+ * then will call
+ * {@link #leafNode(org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier, Object)} twice
+ * and then, {@link #endNode()} to end container node.
+ *
+ * <p>Based on the each node, the node type is also written to the stream, that helps in reconstructing the object,
+ * while reading.
+ */
+class LithiumNormalizedNodeOutputStreamWriter extends AbstractNormalizedNodeDataOutput {
+    private final Map<String, Integer> stringCodeMap = new HashMap<>();
+
+    LithiumNormalizedNodeOutputStreamWriter(final DataOutput output) {
+        super(output);
+    }
+
+    @Override
+    protected short streamVersion() {
+        return TokenTypes.LITHIUM_VERSION;
+    }
+
+    @Override
+    protected void writeQName(final QName qname) throws IOException {
+        writeString(qname.getLocalName());
+        writeString(qname.getNamespace().toString());
+        writeString(qname.getRevision().map(Revision::toString).orElse(null));
+    }
+
+    @Override
+    protected final void writeString(final String string) throws IOException {
+        if (string != null) {
+            final Integer value = stringCodeMap.get(string);
+            if (value == null) {
+                stringCodeMap.put(string, stringCodeMap.size());
+                writeByte(TokenTypes.IS_STRING_VALUE);
+                writeUTF(string);
+            } else {
+                writeByte(TokenTypes.IS_CODE_VALUE);
+                writeInt(value);
+            }
+        } else {
+            writeByte(TokenTypes.IS_NULL_VALUE);
+        }
+    }
+}
index 7d793b1..93eb55c 100644 (file)
@@ -35,4 +35,12 @@ public interface NormalizedNodeDataInput extends DataInput {
     PathArgument readPathArgument() throws IOException;
 
     SchemaPath readSchemaPath() throws IOException;
+
+    /**
+     * Return the version of the underlying input stream.
+     *
+     * @return Stream version
+     * @throws IOException if the version cannot be ascertained
+     */
+    NormalizedNodeStreamVersion getVersion() throws IOException;
 }
index f46a9e2..bcb4cbf 100644 (file)
@@ -43,7 +43,8 @@ public final class NormalizedNodeInputOutput {
     }
 
     /**
-     * Creates a new {@link NormalizedNodeDataOutput} instance that writes to the given output.
+     * Creates a new {@link NormalizedNodeDataOutput} instance that writes to the given output and latest current
+     * stream version.
      *
      * @param output the DataOutput to write to
      * @return a new {@link NormalizedNodeDataOutput} instance
@@ -51,4 +52,24 @@ public final class NormalizedNodeInputOutput {
     public static NormalizedNodeDataOutput newDataOutput(final @NonNull DataOutput output) {
         return new NormalizedNodeOutputStreamWriter(output);
     }
+
+    /**
+     * Creates a new {@link NormalizedNodeDataOutput} instance that writes to the given output.
+     *
+     * @param output the DataOutput to write to
+     * @param version Streaming version to use
+     * @return a new {@link NormalizedNodeDataOutput} instance
+     */
+    public static NormalizedNodeDataOutput newDataOutput(final @NonNull DataOutput output,
+            final @NonNull NormalizedNodeStreamVersion version) {
+        switch (version) {
+            case LITHIUM:
+                return new LithiumNormalizedNodeOutputStreamWriter(output);
+            case SODIUM:
+                return new SodiumNormalizedNodeOutputStreamWriter(output);
+            default:
+                throw new IllegalStateException("Unhandled version " + version);
+        }
+    }
+
 }
index 40ca4f2..a9515d2 100644 (file)
@@ -5,15 +5,9 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.cluster.datastore.node.utils.stream;
 
 import java.io.DataOutput;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.Revision;
 
 /**
  * NormalizedNodeOutputStreamWriter will be used by distributed datastore to send normalized node in
@@ -29,39 +23,9 @@ import org.opendaylight.yangtools.yang.common.Revision;
  * <p>Based on the each node, the node type is also written to the stream, that helps in reconstructing the object,
  * while reading.
  */
-final class NormalizedNodeOutputStreamWriter extends AbstractNormalizedNodeDataOutput {
-    private final Map<String, Integer> stringCodeMap = new HashMap<>();
-
+// FIXME: CONTROLLER-1888: switch this to Sodium once we have a corresponding datastore version
+class NormalizedNodeOutputStreamWriter extends LithiumNormalizedNodeOutputStreamWriter {
     NormalizedNodeOutputStreamWriter(final DataOutput output) {
         super(output);
     }
-
-    @Override
-    protected short streamVersion() {
-        return TokenTypes.LITHIUM_VERSION;
-    }
-
-    @Override
-    protected void writeQName(final QName qname) throws IOException {
-        writeString(qname.getLocalName());
-        writeString(qname.getNamespace().toString());
-        writeString(qname.getRevision().map(Revision::toString).orElse(null));
-    }
-
-    @Override
-    protected void writeString(final String string) throws IOException {
-        if (string != null) {
-            final Integer value = stringCodeMap.get(string);
-            if (value == null) {
-                stringCodeMap.put(string, stringCodeMap.size());
-                writeByte(TokenTypes.IS_STRING_VALUE);
-                writeUTF(string);
-            } else {
-                writeByte(TokenTypes.IS_CODE_VALUE);
-                writeInt(value);
-            }
-        } else {
-            writeByte(TokenTypes.IS_NULL_VALUE);
-        }
-    }
 }
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeStreamVersion.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeStreamVersion.java
new file mode 100644 (file)
index 0000000..97d0f04
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.node.utils.stream;
+
+import com.google.common.annotations.Beta;
+import org.eclipse.jdt.annotation.NonNullByDefault;
+
+/**
+ * Enumeration of all stream versions this implementation supports on both input and output.
+ */
+@Beta
+@NonNullByDefault
+public enum NormalizedNodeStreamVersion {
+    LITHIUM,
+    SODIUM;
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/SodiumNormalizedNodeInputStreamReader.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/SodiumNormalizedNodeInputStreamReader.java
new file mode 100644 (file)
index 0000000..eb7e9f1
--- /dev/null
@@ -0,0 +1,83 @@
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.node.utils.stream;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.AugmentationIdentifier;
+
+final class SodiumNormalizedNodeInputStreamReader extends LithiumNormalizedNodeInputStreamReader {
+    private final List<AugmentationIdentifier> codedAugments = new ArrayList<>();
+    private final List<QName> codedQNames = new ArrayList<>();
+
+    SodiumNormalizedNodeInputStreamReader(final DataInput input) {
+        super(input);
+    }
+
+    @Override
+    public NormalizedNodeStreamVersion getVersion() throws IOException {
+        return NormalizedNodeStreamVersion.SODIUM;
+    }
+
+    @Override
+    QName readQName() throws IOException {
+        final byte valueType = readByte();
+        switch (valueType) {
+            case TokenTypes.IS_QNAME_CODE:
+                return codedQName(readInt());
+            case TokenTypes.IS_QNAME_VALUE:
+                return rawQName();
+            default:
+                throw new IOException("Unhandled QName value type " + valueType);
+        }
+    }
+
+    @Override
+    AugmentationIdentifier readAugmentationIdentifier() throws IOException {
+        final byte valueType = readByte();
+        switch (valueType) {
+            case TokenTypes.IS_AUGMENT_CODE:
+                return codecAugmentId(readInt());
+            case TokenTypes.IS_AUGMENT_VALUE:
+                return rawAugmentId();
+            default:
+                throw new IOException("Unhandled QName value type " + valueType);
+        }
+    }
+
+    private QName codedQName(final int code) throws IOException {
+        try {
+            return codedQNames.get(code);
+        } catch (IndexOutOfBoundsException e) {
+            throw new IOException("QName code " + code + " was not found", e);
+        }
+    }
+
+    private QName rawQName() throws IOException {
+        final QName qname = super.readQName();
+        codedQNames.add(qname);
+        return qname;
+    }
+
+    private AugmentationIdentifier codecAugmentId(final int code) throws IOException {
+        try {
+            return codedAugments.get(code);
+        } catch (IndexOutOfBoundsException e) {
+            throw new IOException("QName set code " + code + " was not found", e);
+        }
+    }
+
+    private AugmentationIdentifier rawAugmentId() throws IOException {
+        final AugmentationIdentifier aid = super.readAugmentationIdentifier();
+        codedAugments.add(aid);
+        return aid;
+    }
+}
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/SodiumNormalizedNodeOutputStreamWriter.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/SodiumNormalizedNodeOutputStreamWriter.java
new file mode 100644 (file)
index 0000000..c549290
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.node.utils.stream;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.AugmentationIdentifier;
+
+/**
+ * NormalizedNodeOutputStreamWriter will be used by distributed datastore to send normalized node in
+ * a stream.
+ * A stream writer wrapper around this class will write node objects to stream in recursive manner.
+ * for example - If you have a ContainerNode which has a two LeafNode as children, then
+ * you will first call
+ * {@link #startContainerNode(org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier, int)},
+ * then will call
+ * {@link #leafNode(org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier, Object)} twice
+ * and then, {@link #endNode()} to end container node.
+ *
+ * <p>Based on the each node, the node type is also written to the stream, that helps in reconstructing the object,
+ * while reading.
+ */
+class SodiumNormalizedNodeOutputStreamWriter extends LithiumNormalizedNodeOutputStreamWriter {
+    private final Map<AugmentationIdentifier, Integer> aidCodeMap = new HashMap<>();
+    private final Map<QName, Integer> qnameCodeMap = new HashMap<>();
+
+    SodiumNormalizedNodeOutputStreamWriter(final DataOutput output) {
+        super(output);
+    }
+
+    @Override
+    protected short streamVersion() {
+        return TokenTypes.SODIUM_VERSION;
+    }
+
+    @Override
+    protected final void writeQName(final QName qname) throws IOException {
+        final Integer value = qnameCodeMap.get(qname);
+        if (value == null) {
+            // Fresh QName, remember it and emit as three strings
+            qnameCodeMap.put(qname, qnameCodeMap.size());
+            writeByte(TokenTypes.IS_QNAME_VALUE);
+            super.writeQName(qname);
+        } else {
+            // We have already seen this QName: write its code
+            writeByte(TokenTypes.IS_QNAME_CODE);
+            writeInt(value);
+        }
+    }
+
+    @Override
+    void writeAugmentationIdentifier(final AugmentationIdentifier aid) throws IOException {
+        final Integer value = aidCodeMap.get(aid);
+        if (value == null) {
+            // Fresh QName, remember it and emit as three strings
+            aidCodeMap.put(aid, aidCodeMap.size());
+            writeByte(TokenTypes.IS_AUGMENT_VALUE);
+            super.writeAugmentationIdentifier(aid);
+        } else {
+            // We have already seen this QName set: write its code
+            writeByte(TokenTypes.IS_AUGMENT_CODE);
+            writeInt(value);
+        }
+    }
+}
index 90f0472..d41a4f0 100644 (file)
@@ -18,9 +18,19 @@ final class TokenTypes {
      * Original stream version. Uses a per-stream dictionary for strings. QNames are serialized as three strings.
      */
     static final short LITHIUM_VERSION = 1;
+    /**
+     * Revised stream version. Unlike {@link #LITHIUM_VERSION}, QNames are using a per-stream dictionary, too.
+     */
+    static final short SODIUM_VERSION = 2;
 
     // Tokens supported in LITHIUM_VERSION
     static final byte IS_CODE_VALUE = 1;
     static final byte IS_STRING_VALUE = 2;
     static final byte IS_NULL_VALUE = 3;
+
+    // Tokens supported in SODIUM_VERSION
+    static final byte IS_QNAME_CODE = 4;
+    static final byte IS_QNAME_VALUE = 5;
+    static final byte IS_AUGMENT_CODE = 6;
+    static final byte IS_AUGMENT_VALUE = 7;
 }
index 860fe39..9e7bbdc 100644 (file)
@@ -35,7 +35,10 @@ final class VersionedNormalizedNodeDataInput extends ForwardingNormalizedNodeDat
         final NormalizedNodeDataInput ret;
         switch (version) {
             case TokenTypes.LITHIUM_VERSION:
-                ret = new NormalizedNodeInputStreamReader(input);
+                ret = new LithiumNormalizedNodeInputStreamReader(input);
+                break;
+            case TokenTypes.SODIUM_VERSION:
+                ret = new SodiumNormalizedNodeInputStreamReader(input);
                 break;
             default:
                 throw defunct("Unhandled stream version %s", version);

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.