Add streaming interface to NormalizedNodeDataInput 03/85403/3
authorRobert Varga <robert.varga@pantheon.tech>
Mon, 28 Oct 2019 12:10:56 +0000 (13:10 +0100)
committerRobert Varga <nite@hq.sk>
Sat, 16 Nov 2019 12:47:16 +0000 (12:47 +0000)
NormalizedNodeDataInput allows reading NormalizedNodes, which is
wired directly to immutable node builders, providing little
flexibility.

This inflexibility requires us to re-stream the data set through
NormalizedNodePruner, effectively doubling memory requirements
during recovery.

Extend NormalizedNodeDataInput to have a NormalizedNodeStreamWriter
to be plugged in, so that the byte stream becomes a source of
NormalizedNodeStreamWriter events, which can be routed through
a custom-built pipeline.

readNormalizedNode() becomes a simple default wrapper, which
pipes the stream into an ImmutableNormalizedNodeStreamWriter.

JIRA: CONTROLLER-1889
Change-Id: Ic732ba9105dd7e27d5612853b931aba66bdd83a5
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit c7e9379c4e2f3b2e916d94c938623df7966c7fa0)

opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/AbstractLithiumDataInput.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/ForwardingNormalizedNodeDataInput.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeDataInput.java

index 88e6a08bf55e65c4c6f58e61d463cc53acf46b61..88b06a1ea03b7c06b0b6c61cefa3568a90fa4208 100644 (file)
@@ -37,13 +37,7 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdent
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeWithValue;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
-import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
-import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.ListNodeBuilder;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.NormalizedNodeAttrBuilder;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.NormalizedNodeContainerBuilder;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,11 +60,6 @@ abstract class AbstractLithiumDataInput extends ForwardingDataInput implements N
 
     private QName lastLeafSetQName;
 
-    private NormalizedNodeAttrBuilder<NodeIdentifier, Object, LeafNode<Object>> leafBuilder;
-
-    @SuppressWarnings("rawtypes")
-    private NormalizedNodeAttrBuilder<NodeWithValue, Object, LeafSetEntryNode<Object>> leafSetEntryBuilder;
-
     AbstractLithiumDataInput(final DataInput input) {
         this.input = requireNonNull(input);
     }
@@ -81,114 +70,161 @@ abstract class AbstractLithiumDataInput extends ForwardingDataInput implements N
     }
 
     @Override
-    public final NormalizedNode<?, ?> readNormalizedNode() throws IOException {
-        return readNormalizedNodeInternal();
+    public final void streamNormalizedNode(final NormalizedNodeStreamWriter writer) throws IOException {
+        streamNormalizedNode(requireNonNull(writer), input.readByte());
     }
 
-    private NormalizedNode<?, ?> readNormalizedNodeInternal() throws IOException {
-        // each node should start with a byte
-        byte nodeType = input.readByte();
-
-        if (nodeType == NodeTypes.END_NODE) {
-            LOG.trace("End node reached. return");
-            lastLeafSetQName = null;
-            return null;
-        }
-
+    private void streamNormalizedNode(final NormalizedNodeStreamWriter writer, final byte nodeType) throws IOException {
         switch (nodeType) {
+            case NodeTypes.ANY_XML_NODE:
+                streamAnyxml(writer);
+                break;
             case NodeTypes.AUGMENTATION_NODE:
-                AugmentationIdentifier augIdentifier = readAugmentationIdentifier();
-                LOG.trace("Reading augmentation node {} ", augIdentifier);
-                return addDataContainerChildren(Builders.augmentationBuilder().withNodeIdentifier(augIdentifier))
-                        .build();
-
+                streamAugmentation(writer);
+                break;
+            case NodeTypes.CHOICE_NODE:
+                streamChoice(writer);
+                break;
+            case NodeTypes.CONTAINER_NODE:
+                streamContainer(writer);
+                break;
+            case NodeTypes.LEAF_NODE:
+                streamLeaf(writer);
+                break;
+            case NodeTypes.LEAF_SET:
+                streamLeafSet(writer);
+                break;
+            case NodeTypes.ORDERED_LEAF_SET:
+                streamOrderedLeafSet(writer);
+                break;
             case NodeTypes.LEAF_SET_ENTRY_NODE:
-                final QName name = lastLeafSetQName != null ? lastLeafSetQName : readQName();
-                final Object value = readObject();
-                final NodeWithValue<Object> leafIdentifier = new NodeWithValue<>(name, value);
-                LOG.trace("Reading leaf set entry node {}, value {}", leafIdentifier, value);
-                return leafSetEntryBuilder().withNodeIdentifier(leafIdentifier).withValue(value).build();
-
+                streamLeafSetEntry(writer);
+                break;
             case NodeTypes.MAP_ENTRY_NODE:
-                final NodeIdentifierWithPredicates entryIdentifier = readNormalizedNodeWithPredicates();
-                LOG.trace("Reading map entry node {} ", entryIdentifier);
-                return addDataContainerChildren(Builders.mapEntryBuilder().withNodeIdentifier(entryIdentifier))
-                        .build();
-
+                streamMapEntry(writer);
+                break;
+            case NodeTypes.MAP_NODE:
+                streamMap(writer);
+                break;
+            case NodeTypes.ORDERED_MAP_NODE:
+                streamOrderedMap(writer);
+                break;
+            case NodeTypes.UNKEYED_LIST:
+                streamUnkeyedList(writer);
+                break;
+            case NodeTypes.UNKEYED_LIST_ITEM:
+                streamUnkeyedListItem(writer);
+                break;
             default:
-                return readNodeIdentifierDependentNode(nodeType, readNodeIdentifier());
+                throw new InvalidNormalizedNodeStreamException("Unexpected node " + nodeType);
         }
     }
 
-    private NormalizedNodeAttrBuilder<NodeIdentifier, Object, LeafNode<Object>> leafBuilder() {
-        if (leafBuilder == null) {
-            leafBuilder = Builders.leafBuilder();
-        }
-
-        return leafBuilder;
+    private void streamAnyxml(final NormalizedNodeStreamWriter writer) throws IOException {
+        final NodeIdentifier identifier = readNodeIdentifier();
+        LOG.trace("Streaming anyxml node {}", identifier);
+        writer.anyxmlNode(identifier, readDOMSource());
     }
 
-    @SuppressWarnings("rawtypes")
-    private NormalizedNodeAttrBuilder<NodeWithValue, Object,
-                                      LeafSetEntryNode<Object>> leafSetEntryBuilder() {
-        if (leafSetEntryBuilder == null) {
-            leafSetEntryBuilder = Builders.leafSetEntryBuilder();
-        }
+    private void streamAugmentation(final NormalizedNodeStreamWriter writer) throws IOException {
+        final AugmentationIdentifier augIdentifier = readAugmentationIdentifier();
+        LOG.trace("Streaming augmentation node {}", augIdentifier);
+        writer.startAugmentationNode(augIdentifier);
+        commonStreamContainer(writer);
+    }
 
-        return leafSetEntryBuilder;
+    private void streamChoice(final NormalizedNodeStreamWriter writer) throws IOException {
+        final NodeIdentifier identifier = readNodeIdentifier();
+        LOG.trace("Streaming choice node {}", identifier);
+        writer.startChoiceNode(identifier, NormalizedNodeStreamWriter.UNKNOWN_SIZE);
+        commonStreamContainer(writer);
     }
 
-    private NormalizedNode<?, ?> readNodeIdentifierDependentNode(final byte nodeType, final NodeIdentifier identifier)
-        throws IOException {
+    private void streamContainer(final NormalizedNodeStreamWriter writer) throws IOException {
+        final NodeIdentifier identifier = readNodeIdentifier();
+        LOG.trace("Streaming container node {}", identifier);
+        writer.startContainerNode(identifier, NormalizedNodeStreamWriter.UNKNOWN_SIZE);
+        commonStreamContainer(writer);
+    }
 
-        switch (nodeType) {
-            case NodeTypes.LEAF_NODE:
-                LOG.trace("Read leaf node {}", identifier);
-                // Read the object value
-                return leafBuilder().withNodeIdentifier(identifier).withValue(readObject()).build();
+    private void streamLeaf(final NormalizedNodeStreamWriter writer) throws IOException {
+        final NodeIdentifier identifier = readNodeIdentifier();
+        LOG.trace("Streaming leaf node {}", identifier);
+        writer.leafNode(identifier, readObject());
+    }
 
-            case NodeTypes.ANY_XML_NODE:
-                LOG.trace("Read xml node");
-                return Builders.anyXmlBuilder().withNodeIdentifier(identifier).withValue(readDOMSource()).build();
+    private void streamLeafSet(final NormalizedNodeStreamWriter writer) throws IOException {
+        final NodeIdentifier identifier = readNodeIdentifier();
+        LOG.trace("Streaming leaf set node {}", identifier);
+        writer.startLeafSet(identifier, NormalizedNodeStreamWriter.UNKNOWN_SIZE);
+        commonStreamLeafSet(writer, identifier);
+    }
 
-            case NodeTypes.MAP_NODE:
-                LOG.trace("Read map node {}", identifier);
-                return addDataContainerChildren(Builders.mapBuilder().withNodeIdentifier(identifier)).build();
+    private void streamOrderedLeafSet(final NormalizedNodeStreamWriter writer) throws IOException {
+        final NodeIdentifier identifier = readNodeIdentifier();
+        LOG.trace("Streaming ordered leaf set node {}", identifier);
+        writer.startOrderedLeafSet(identifier, NormalizedNodeStreamWriter.UNKNOWN_SIZE);
+        commonStreamLeafSet(writer, identifier);
+    }
 
-            case NodeTypes.CHOICE_NODE:
-                LOG.trace("Read choice node {}", identifier);
-                return addDataContainerChildren(Builders.choiceBuilder().withNodeIdentifier(identifier)).build();
+    private void commonStreamLeafSet(final NormalizedNodeStreamWriter writer, final NodeIdentifier identifier)
+            throws IOException {
+        lastLeafSetQName = identifier.getNodeType();
+        try {
+            commonStreamContainer(writer);
+        } finally {
+            // Make sure we never leak this
+            lastLeafSetQName = null;
+        }
+    }
 
-            case NodeTypes.ORDERED_MAP_NODE:
-                LOG.trace("Reading ordered map node {}", identifier);
-                return addDataContainerChildren(Builders.orderedMapBuilder().withNodeIdentifier(identifier)).build();
+    private void streamLeafSetEntry(final NormalizedNodeStreamWriter writer) throws IOException {
+        final QName name = lastLeafSetQName != null ? lastLeafSetQName : readQName();
+        final Object value = readObject();
+        LOG.trace("Streaming leaf set entry node {}, value {}", name, value);
+        writer.leafSetEntryNode(name, value);
+    }
 
-            case NodeTypes.UNKEYED_LIST:
-                LOG.trace("Read unkeyed list node {}", identifier);
-                return addDataContainerChildren(Builders.unkeyedListBuilder().withNodeIdentifier(identifier)).build();
+    private void streamMap(final NormalizedNodeStreamWriter writer) throws IOException {
+        final NodeIdentifier identifier = readNodeIdentifier();
+        LOG.trace("Streaming map node {}", identifier);
+        writer.startMapNode(identifier, NormalizedNodeStreamWriter.UNKNOWN_SIZE);
+        commonStreamContainer(writer);
+    }
 
-            case NodeTypes.UNKEYED_LIST_ITEM:
-                LOG.trace("Read unkeyed list item node {}", identifier);
-                return addDataContainerChildren(Builders.unkeyedListEntryBuilder()
-                        .withNodeIdentifier(identifier)).build();
+    private void streamOrderedMap(final NormalizedNodeStreamWriter writer) throws IOException {
+        final NodeIdentifier identifier = readNodeIdentifier();
+        LOG.trace("Streaming ordered map node {}", identifier);
+        writer.startOrderedMapNode(identifier, NormalizedNodeStreamWriter.UNKNOWN_SIZE);
+        commonStreamContainer(writer);
+    }
 
-            case NodeTypes.CONTAINER_NODE:
-                LOG.trace("Read container node {}", identifier);
-                return addDataContainerChildren(Builders.containerBuilder().withNodeIdentifier(identifier)).build();
+    private void streamMapEntry(final NormalizedNodeStreamWriter writer) throws IOException {
+        final NodeIdentifierWithPredicates entryIdentifier = readNormalizedNodeWithPredicates();
+        LOG.trace("Streaming map entry node {}", entryIdentifier);
+        writer.startMapEntryNode(entryIdentifier, NormalizedNodeStreamWriter.UNKNOWN_SIZE);
+        commonStreamContainer(writer);
+    }
 
-            case NodeTypes.LEAF_SET:
-                LOG.trace("Read leaf set node {}", identifier);
-                return addLeafSetChildren(identifier.getNodeType(),
-                        Builders.leafSetBuilder().withNodeIdentifier(identifier)).build();
+    private void streamUnkeyedList(final NormalizedNodeStreamWriter writer) throws IOException {
+        final NodeIdentifier identifier = readNodeIdentifier();
+        LOG.trace("Streaming unkeyed list node {}", identifier);
+        writer.startUnkeyedList(identifier, NormalizedNodeStreamWriter.UNKNOWN_SIZE);
+        commonStreamContainer(writer);
+    }
 
-            case NodeTypes.ORDERED_LEAF_SET:
-                LOG.trace("Read ordered leaf set node {}", identifier);
-                return addLeafSetChildren(identifier.getNodeType(),
-                        Builders.orderedLeafSetBuilder().withNodeIdentifier(identifier)).build();
+    private void streamUnkeyedListItem(final NormalizedNodeStreamWriter writer) throws IOException {
+        final NodeIdentifier identifier = readNodeIdentifier();
+        LOG.trace("Streaming unkeyed list item node {}", identifier);
+        writer.startUnkeyedListItem(identifier, NormalizedNodeStreamWriter.UNKNOWN_SIZE);
+        commonStreamContainer(writer);
+    }
 
-            default:
-                return null;
+    private void commonStreamContainer(final NormalizedNodeStreamWriter writer) throws IOException {
+        for (byte nodeType = input.readByte(); nodeType != NodeTypes.END_NODE; nodeType = input.readByte()) {
+            streamNormalizedNode(writer, nodeType);
         }
+        writer.endNode();
     }
 
     private DOMSource readDOMSource() throws IOException {
@@ -244,7 +280,6 @@ abstract class AbstractLithiumDataInput extends ForwardingDataInput implements N
         return children;
     }
 
-
     abstract AugmentationIdentifier readAugmentationIdentifier() throws IOException;
 
     abstract NodeIdentifier readNodeIdentifier() throws IOException;
@@ -395,35 +430,4 @@ abstract class AbstractLithiumDataInput extends ForwardingDataInput implements N
                 return null;
         }
     }
-
-    @SuppressWarnings("unchecked")
-    private ListNodeBuilder<Object, LeafSetEntryNode<Object>> addLeafSetChildren(final QName nodeType,
-            final ListNodeBuilder<Object, LeafSetEntryNode<Object>> builder) throws IOException {
-
-        LOG.trace("Reading children of leaf set");
-
-        lastLeafSetQName = nodeType;
-
-        LeafSetEntryNode<Object> child = (LeafSetEntryNode<Object>)readNormalizedNodeInternal();
-
-        while (child != null) {
-            builder.withChild(child);
-            child = (LeafSetEntryNode<Object>)readNormalizedNodeInternal();
-        }
-        return builder;
-    }
-
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    private NormalizedNodeContainerBuilder addDataContainerChildren(
-            final NormalizedNodeContainerBuilder builder) throws IOException {
-        LOG.trace("Reading data container (leaf nodes) nodes");
-
-        NormalizedNode<?, ?> child = readNormalizedNodeInternal();
-
-        while (child != null) {
-            builder.addChild(child);
-            child = readNormalizedNodeInternal();
-        }
-        return builder;
-    }
 }
index 97beda38aa945abc2c5d079db34c23663e0aa054..3cb865c0df7243ff1ea87acb8e9b84e2ff1b7f92 100644 (file)
@@ -12,6 +12,7 @@ import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 
 abstract class ForwardingNormalizedNodeDataInput extends ForwardingDataInput implements NormalizedNodeDataInput {
@@ -19,6 +20,11 @@ abstract class ForwardingNormalizedNodeDataInput extends ForwardingDataInput imp
     @Override
     abstract @NonNull NormalizedNodeDataInput delegate() throws IOException;
 
+    @Override
+    public final void streamNormalizedNode(final NormalizedNodeStreamWriter writer) throws IOException {
+        delegate().streamNormalizedNode(writer);
+    }
+
     @Override
     public final NormalizedNode<?, ?> readNormalizedNode() throws IOException {
         return delegate().readNormalizedNode();
index 93eb55cc8e07b388a3865c9ae39cd8e966d0e25e..5f402c849f5e2b1cd20fc75cc55b2e92ce0fdbef 100644 (file)
@@ -13,6 +13,9 @@ import java.io.IOException;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNormalizedNodeStreamWriter;
+import org.opendaylight.yangtools.yang.data.impl.schema.NormalizedNodeResult;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 
 /**
@@ -21,6 +24,16 @@ import org.opendaylight.yangtools.yang.model.api.SchemaPath;
  */
 @Beta
 public interface NormalizedNodeDataInput extends DataInput {
+    /**
+     * Interpret current stream position as a NormalizedNode, stream its events into a NormalizedNodeStreamWriter.
+     *
+     * @param writer Writer to emit events to
+     * @throws IOException if an error occurs
+     * @throws IllegalStateException if the dictionary has been detached
+     * @throws NullPointerException if {@code writer} is null
+     */
+    void streamNormalizedNode(NormalizedNodeStreamWriter writer) throws IOException;
+
     /**
      * Read a normalized node from the reader.
      *
@@ -28,7 +41,13 @@ public interface NormalizedNodeDataInput extends DataInput {
      * @throws IOException if an error occurs
      * @throws IllegalStateException if the dictionary has been detached
      */
-    NormalizedNode<?, ?> readNormalizedNode() throws IOException;
+    default NormalizedNode<?, ?> readNormalizedNode() throws IOException {
+        final NormalizedNodeResult result = new NormalizedNodeResult();
+        try (NormalizedNodeStreamWriter writer = ImmutableNormalizedNodeStreamWriter.from(result)) {
+            streamNormalizedNode(writer);
+        }
+        return result.getResult();
+    }
 
     YangInstanceIdentifier readYangInstanceIdentifier() throws IOException;