From: Robert Varga Date: Mon, 13 May 2019 05:44:05 +0000 (+0200) Subject: Add streaming interface to NormalizedNodeDataInput X-Git-Tag: release/sodium~38 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=refs%2Fchanges%2F09%2F82009%2F24 Add streaming interface to NormalizedNodeDataInput NormalizedNodeDataInput allows reading NormalizedNodes, which is wired directly to immutable node builders, providing little flexibility. This inflexibility requires us to re-stream the data set through NormalizedNodePruner, effectively doubling memory requirements during recovery. Extend NormalizedNodeDataInput to have a NormalizedNodeStreamWriter to be plugged in, so that the byte stream becomes a source of NormalizedNodeStreamWriter events, which can be routed through a custom-built pipeline. readNormalizedNode() becomes a simple default wrapper, which pipes the stream into an ImmutableNormalizedNodeStreamWriter. JIRA: CONTROLLER-1889 Change-Id: Ic732ba9105dd7e27d5612853b931aba66bdd83a5 Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/ForwardingNormalizedNodeDataInput.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/ForwardingNormalizedNodeDataInput.java index cc28b03e65..1c160698a4 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/ForwardingNormalizedNodeDataInput.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/ForwardingNormalizedNodeDataInput.java @@ -13,6 +13,7 @@ import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter; import org.opendaylight.yangtools.yang.model.api.SchemaPath; abstract class ForwardingNormalizedNodeDataInput extends ForwardingDataInput implements NormalizedNodeDataInput { @@ -20,6 +21,11 @@ abstract class ForwardingNormalizedNodeDataInput extends ForwardingDataInput imp @Override abstract @NonNull NormalizedNodeDataInput delegate() throws IOException; + @Override + public final void streamNormalizedNode(final NormalizedNodeStreamWriter writer) throws IOException { + delegate().streamNormalizedNode(writer); + } + @Override public final NormalizedNode readNormalizedNode() throws IOException { return delegate().readNormalizedNode(); diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/LithiumNormalizedNodeInputStreamReader.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/LithiumNormalizedNodeInputStreamReader.java index 1a0d6dd94b..552f1e96a2 100755 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/LithiumNormalizedNodeInputStreamReader.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/LithiumNormalizedNodeInputStreamReader.java @@ -37,13 +37,7 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdent import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeWithValue; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; -import org.opendaylight.yangtools.yang.data.api.schema.LeafNode; -import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.impl.schema.Builders; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.ListNodeBuilder; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.NormalizedNodeBuilder; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.NormalizedNodeContainerBuilder; +import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter; import org.opendaylight.yangtools.yang.model.api.SchemaPath; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,11 +60,6 @@ class LithiumNormalizedNodeInputStreamReader extends ForwardingDataInput impleme private QName lastLeafSetQName; - private NormalizedNodeBuilder> leafBuilder; - - @SuppressWarnings("rawtypes") - private NormalizedNodeBuilder> leafSetEntryBuilder; - LithiumNormalizedNodeInputStreamReader(final DataInput input) { this.input = requireNonNull(input); } @@ -86,113 +75,168 @@ class LithiumNormalizedNodeInputStreamReader extends ForwardingDataInput impleme } @Override - public NormalizedNode readNormalizedNode() throws IOException { - return readNormalizedNodeInternal(); + public void streamNormalizedNode(final NormalizedNodeStreamWriter writer) throws IOException { + streamNormalizedNode(requireNonNull(writer), input.readByte()); } - private NormalizedNode readNormalizedNodeInternal() throws IOException { - // each node should start with a byte - byte nodeType = input.readByte(); - - if (nodeType == NodeTypes.END_NODE) { - LOG.trace("End node reached. return"); - lastLeafSetQName = null; - return null; - } - + private void streamNormalizedNode(final NormalizedNodeStreamWriter writer, final byte nodeType) throws IOException { switch (nodeType) { + case NodeTypes.ANY_XML_NODE: + streamAnyxml(writer); + break; case NodeTypes.AUGMENTATION_NODE: - AugmentationIdentifier augIdentifier = readAugmentationIdentifier(); - LOG.trace("Reading augmentation node {} ", augIdentifier); - return addDataContainerChildren(Builders.augmentationBuilder().withNodeIdentifier(augIdentifier)) - .build(); - + streamAugmentation(writer); + break; + case NodeTypes.CHOICE_NODE: + streamChoice(writer); + break; + case NodeTypes.CONTAINER_NODE: + streamContainer(writer); + break; + case NodeTypes.LEAF_NODE: + streamLeaf(writer); + break; + case NodeTypes.LEAF_SET: + streamLeafSet(writer); + break; + case NodeTypes.ORDERED_LEAF_SET: + streamOrderedLeafSet(writer); + break; case NodeTypes.LEAF_SET_ENTRY_NODE: - final QName name = lastLeafSetQName != null ? lastLeafSetQName : readQName(); - final Object value = readObject(); - final NodeWithValue leafIdentifier = new NodeWithValue<>(name, value); - LOG.trace("Reading leaf set entry node {}, value {}", leafIdentifier, value); - return leafSetEntryBuilder().withNodeIdentifier(leafIdentifier).withValue(value).build(); - + streamLeafSetEntry(writer); + break; case NodeTypes.MAP_ENTRY_NODE: - final NodeIdentifierWithPredicates entryIdentifier = readNormalizedNodeWithPredicates(); - LOG.trace("Reading map entry node {} ", entryIdentifier); - return addDataContainerChildren(Builders.mapEntryBuilder().withNodeIdentifier(entryIdentifier)) - .build(); - + streamMapEntry(writer); + break; + case NodeTypes.MAP_NODE: + streamMap(writer); + break; + case NodeTypes.ORDERED_MAP_NODE: + streamOrderedMap(writer); + break; + case NodeTypes.UNKEYED_LIST: + streamUnkeyedList(writer); + break; + case NodeTypes.UNKEYED_LIST_ITEM: + streamUnkeyedListItem(writer); + break; default: - return readNodeIdentifierDependentNode(nodeType, readNodeIdentifier()); + throw new InvalidNormalizedNodeStreamException("Unexpected node " + nodeType); } } - private NormalizedNodeBuilder> leafBuilder() { - if (leafBuilder == null) { - leafBuilder = Builders.leafBuilder(); - } - - return leafBuilder; + private void streamAnyxml(final NormalizedNodeStreamWriter writer) throws IOException { + final NodeIdentifier identifier = readNodeIdentifier(); + LOG.trace("Streaming anyxml node {}", identifier); + writer.startAnyxmlNode(identifier); + writer.domSourceValue(readDOMSource()); + writer.endNode(); } - @SuppressWarnings("rawtypes") - private NormalizedNodeBuilder> leafSetEntryBuilder() { - if (leafSetEntryBuilder == null) { - leafSetEntryBuilder = Builders.leafSetEntryBuilder(); - } + private void streamAugmentation(final NormalizedNodeStreamWriter writer) throws IOException { + final AugmentationIdentifier augIdentifier = readAugmentationIdentifier(); + LOG.trace("Streaming augmentation node {}", augIdentifier); + writer.startAugmentationNode(augIdentifier); + commonStreamContainer(writer); + } - return leafSetEntryBuilder; + private void streamChoice(final NormalizedNodeStreamWriter writer) throws IOException { + final NodeIdentifier identifier = readNodeIdentifier(); + LOG.trace("Streaming choice node {}", identifier); + writer.startChoiceNode(identifier, NormalizedNodeStreamWriter.UNKNOWN_SIZE); + commonStreamContainer(writer); } - private NormalizedNode readNodeIdentifierDependentNode(final byte nodeType, final NodeIdentifier identifier) - throws IOException { + private void streamContainer(final NormalizedNodeStreamWriter writer) throws IOException { + final NodeIdentifier identifier = readNodeIdentifier(); + LOG.trace("Streaming container node {}", identifier); + writer.startContainerNode(identifier, NormalizedNodeStreamWriter.UNKNOWN_SIZE); + commonStreamContainer(writer); + } - switch (nodeType) { - case NodeTypes.LEAF_NODE: - LOG.trace("Read leaf node {}", identifier); - // Read the object value - return leafBuilder().withNodeIdentifier(identifier).withValue(readObject()).build(); + private void streamLeaf(final NormalizedNodeStreamWriter writer) throws IOException { + final NodeIdentifier identifier = readNodeIdentifier(); + LOG.trace("Streaming leaf node {}", identifier); + writer.startLeafNode(identifier); + writer.scalarValue(readObject()); + writer.endNode(); + } - case NodeTypes.ANY_XML_NODE: - LOG.trace("Read xml node"); - return Builders.anyXmlBuilder().withNodeIdentifier(identifier).withValue(readDOMSource()).build(); + private void streamLeafSet(final NormalizedNodeStreamWriter writer) throws IOException { + final NodeIdentifier identifier = readNodeIdentifier(); + LOG.trace("Streaming leaf set node {}", identifier); + writer.startLeafSet(identifier, NormalizedNodeStreamWriter.UNKNOWN_SIZE); + commonStreamLeafSet(writer, identifier); + } - case NodeTypes.MAP_NODE: - LOG.trace("Read map node {}", identifier); - return addDataContainerChildren(Builders.mapBuilder().withNodeIdentifier(identifier)).build(); + private void streamOrderedLeafSet(final NormalizedNodeStreamWriter writer) throws IOException { + final NodeIdentifier identifier = readNodeIdentifier(); + LOG.trace("Streaming ordered leaf set node {}", identifier); + writer.startOrderedLeafSet(identifier, NormalizedNodeStreamWriter.UNKNOWN_SIZE); + commonStreamLeafSet(writer, identifier); + } - case NodeTypes.CHOICE_NODE: - LOG.trace("Read choice node {}", identifier); - return addDataContainerChildren(Builders.choiceBuilder().withNodeIdentifier(identifier)).build(); + private void commonStreamLeafSet(final NormalizedNodeStreamWriter writer, final NodeIdentifier identifier) + throws IOException { + lastLeafSetQName = identifier.getNodeType(); + try { + commonStreamContainer(writer); + } finally { + // Make sure we never leak this + lastLeafSetQName = null; + } + } - case NodeTypes.ORDERED_MAP_NODE: - LOG.trace("Reading ordered map node {}", identifier); - return addDataContainerChildren(Builders.orderedMapBuilder().withNodeIdentifier(identifier)).build(); + private void streamLeafSetEntry(final NormalizedNodeStreamWriter writer) throws IOException { + final QName name = lastLeafSetQName != null ? lastLeafSetQName : readQName(); + final Object value = readObject(); + final NodeWithValue leafIdentifier = new NodeWithValue<>(name, value); + LOG.trace("Streaming leaf set entry node {}, value {}", leafIdentifier, value); + writer.startLeafSetEntryNode(leafIdentifier); + writer.scalarValue(value); + writer.endNode(); + } - case NodeTypes.UNKEYED_LIST: - LOG.trace("Read unkeyed list node {}", identifier); - return addDataContainerChildren(Builders.unkeyedListBuilder().withNodeIdentifier(identifier)).build(); + private void streamMap(final NormalizedNodeStreamWriter writer) throws IOException { + final NodeIdentifier identifier = readNodeIdentifier(); + LOG.trace("Streaming map node {}", identifier); + writer.startMapNode(identifier, NormalizedNodeStreamWriter.UNKNOWN_SIZE); + commonStreamContainer(writer); + } - case NodeTypes.UNKEYED_LIST_ITEM: - LOG.trace("Read unkeyed list item node {}", identifier); - return addDataContainerChildren(Builders.unkeyedListEntryBuilder() - .withNodeIdentifier(identifier)).build(); + private void streamOrderedMap(final NormalizedNodeStreamWriter writer) throws IOException { + final NodeIdentifier identifier = readNodeIdentifier(); + LOG.trace("Streaming ordered map node {}", identifier); + writer.startOrderedMapNode(identifier, NormalizedNodeStreamWriter.UNKNOWN_SIZE); + commonStreamContainer(writer); + } - case NodeTypes.CONTAINER_NODE: - LOG.trace("Read container node {}", identifier); - return addDataContainerChildren(Builders.containerBuilder().withNodeIdentifier(identifier)).build(); + private void streamMapEntry(final NormalizedNodeStreamWriter writer) throws IOException { + final NodeIdentifierWithPredicates entryIdentifier = readNormalizedNodeWithPredicates(); + LOG.trace("Streaming map entry node {}", entryIdentifier); + writer.startMapEntryNode(entryIdentifier, NormalizedNodeStreamWriter.UNKNOWN_SIZE); + commonStreamContainer(writer); + } - case NodeTypes.LEAF_SET: - LOG.trace("Read leaf set node {}", identifier); - return addLeafSetChildren(identifier.getNodeType(), - Builders.leafSetBuilder().withNodeIdentifier(identifier)).build(); + private void streamUnkeyedList(final NormalizedNodeStreamWriter writer) throws IOException { + final NodeIdentifier identifier = readNodeIdentifier(); + LOG.trace("Streaming unkeyed list node {}", identifier); + writer.startUnkeyedList(identifier, NormalizedNodeStreamWriter.UNKNOWN_SIZE); + commonStreamContainer(writer); + } - case NodeTypes.ORDERED_LEAF_SET: - LOG.trace("Read ordered leaf set node {}", identifier); - return addLeafSetChildren(identifier.getNodeType(), - Builders.orderedLeafSetBuilder().withNodeIdentifier(identifier)).build(); + private void streamUnkeyedListItem(final NormalizedNodeStreamWriter writer) throws IOException { + final NodeIdentifier identifier = readNodeIdentifier(); + LOG.trace("Streaming unkeyed list item node {}", identifier); + writer.startUnkeyedListItem(identifier, NormalizedNodeStreamWriter.UNKNOWN_SIZE); + commonStreamContainer(writer); + } - default: - return null; + private void commonStreamContainer(final NormalizedNodeStreamWriter writer) throws IOException { + for (byte nodeType = input.readByte(); nodeType != NodeTypes.END_NODE; nodeType = input.readByte()) { + streamNormalizedNode(writer, nodeType); } + writer.endNode(); } private DOMSource readDOMSource() throws IOException { @@ -397,35 +441,4 @@ class LithiumNormalizedNodeInputStreamReader extends ForwardingDataInput impleme return null; } } - - @SuppressWarnings("unchecked") - private ListNodeBuilder> addLeafSetChildren(final QName nodeType, - final ListNodeBuilder> builder) throws IOException { - - LOG.trace("Reading children of leaf set"); - - lastLeafSetQName = nodeType; - - LeafSetEntryNode child = (LeafSetEntryNode)readNormalizedNodeInternal(); - - while (child != null) { - builder.withChild(child); - child = (LeafSetEntryNode)readNormalizedNodeInternal(); - } - return builder; - } - - @SuppressWarnings({ "unchecked", "rawtypes" }) - private NormalizedNodeContainerBuilder addDataContainerChildren( - final NormalizedNodeContainerBuilder builder) throws IOException { - LOG.trace("Reading data container (leaf nodes) nodes"); - - NormalizedNode child = readNormalizedNodeInternal(); - - while (child != null) { - builder.addChild(child); - child = readNormalizedNodeInternal(); - } - return builder; - } } diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeDataInput.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeDataInput.java index 37152f32e9..9dda8bf6e4 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeDataInput.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeDataInput.java @@ -16,6 +16,9 @@ import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNormalizedNodeStreamWriter; +import org.opendaylight.yangtools.yang.data.impl.schema.NormalizedNodeResult; import org.opendaylight.yangtools.yang.model.api.SchemaPath; /** @@ -24,6 +27,16 @@ import org.opendaylight.yangtools.yang.model.api.SchemaPath; */ @Beta public interface NormalizedNodeDataInput extends DataInput { + /** + * Interpret current stream position as a NormalizedNode, stream its events into a NormalizedNodeStreamWriter. + * + * @param writer Writer to emit events to + * @throws IOException if an error occurs + * @throws IllegalStateException if the dictionary has been detached + * @throws NullPointerException if {@code writer} is null + */ + void streamNormalizedNode(NormalizedNodeStreamWriter writer) throws IOException; + /** * Read a normalized node from the reader. * @@ -31,7 +44,13 @@ public interface NormalizedNodeDataInput extends DataInput { * @throws IOException if an error occurs * @throws IllegalStateException if the dictionary has been detached */ - NormalizedNode readNormalizedNode() throws IOException; + default NormalizedNode readNormalizedNode() throws IOException { + final NormalizedNodeResult result = new NormalizedNodeResult(); + try (NormalizedNodeStreamWriter writer = ImmutableNormalizedNodeStreamWriter.from(result)) { + streamNormalizedNode(writer); + } + return result.getResult(); + } YangInstanceIdentifier readYangInstanceIdentifier() throws IOException;