From a2b838f96589b502578fa4e15cef2769f886a378 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Thu, 6 Jun 2019 12:26:37 +0200 Subject: [PATCH 1/1] Add support for reusable streaming With the actual implementations working on top of NormalizedNodeStreamWriter, we gained the ability to flexibly receive stream events. This patch takes advantage of that flexibility by allowing a ReusableImmutableNormalizedNodeStreamWriter to be the receiver of the events -- thus allowing parts of the state involved in building a NormalizedNode tree to be reused -- lowering GC pressure. A number of call sites, which can safely reuse such state are converted to use the newly-introduced facility. Change-Id: Iaf1b3ab2b2996e7004c036fc93a80a8ca8792314 Signed-off-by: Robert Varga --- .../ModifyTransactionRequestProxyV1.java | 5 ++- .../commands/TransactionModification.java | 11 +++--- .../ForwardingNormalizedNodeDataInput.java | 7 ++++ .../utils/stream/NormalizedNodeDataInput.java | 19 ++++++++++ .../modification/MergeModification.java | 7 ++-- .../MutableCompositeModification.java | 8 +++-- .../modification/WriteModification.java | 7 ++-- .../persisted/CommitTransactionPayload.java | 8 ++++- .../DataTreeCandidateInputOutput.java | 35 +++++++++++-------- 9 files changed, 79 insertions(+), 28 deletions(-) diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionRequestProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionRequestProxyV1.java index 026f3ead7b..d6fc4e5c37 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionRequestProxyV1.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionRequestProxyV1.java @@ -22,6 +22,7 @@ import org.opendaylight.controller.cluster.datastore.node.utils.stream.Normalize import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput; import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputOutput; import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeStreamVersion; +import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter; /** * Externalizable proxy for use with {@link ExistsTransactionRequest}. It implements the initial (Boron) serialization @@ -59,8 +60,10 @@ final class ModifyTransactionRequestProxyV1 extends AbstractTransactionRequestPr if (size != 0) { modifications = new ArrayList<>(size); final NormalizedNodeDataInput nnin = NormalizedNodeInputOutput.newDataInput(in); + final ReusableImmutableNormalizedNodeStreamWriter writer = + ReusableImmutableNormalizedNodeStreamWriter.create(); for (int i = 0; i < size; ++i) { - modifications.add(TransactionModification.readFrom(nnin)); + modifications.add(TransactionModification.readFrom(nnin, writer)); } } else { modifications = ImmutableList.of(); diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionModification.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionModification.java index d71142201c..dcd05c3514 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionModification.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionModification.java @@ -14,10 +14,12 @@ import java.io.IOException; import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput; import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter; /** * An individual modification of a transaction's state. This class and its subclasses are not serializable, but rather - * expose {@link #writeTo(NormalizedNodeDataOutput)} and {@link #readFrom(NormalizedNodeDataInput)} methods for explicit + * expose {@link #writeTo(NormalizedNodeDataOutput)} and + * {@link #readFrom(NormalizedNodeDataInput, ReusableImmutableNormalizedNodeStreamWriter)} methods for explicit * serialization. The reason for this is that they are usually transmitted in bulk, hence it is advantageous to reuse * a {@link NormalizedNodeDataOutput} instance to achieve better compression. * @@ -51,15 +53,16 @@ public abstract class TransactionModification { out.writeYangInstanceIdentifier(path); } - static TransactionModification readFrom(final NormalizedNodeDataInput in) throws IOException { + static TransactionModification readFrom(final NormalizedNodeDataInput in, + final ReusableImmutableNormalizedNodeStreamWriter writer) throws IOException { final byte type = in.readByte(); switch (type) { case TYPE_DELETE: return new TransactionDelete(in.readYangInstanceIdentifier()); case TYPE_MERGE: - return new TransactionMerge(in.readYangInstanceIdentifier(), in.readNormalizedNode()); + return new TransactionMerge(in.readYangInstanceIdentifier(), in.readNormalizedNode(writer)); case TYPE_WRITE: - return new TransactionWrite(in.readYangInstanceIdentifier(), in.readNormalizedNode()); + return new TransactionWrite(in.readYangInstanceIdentifier(), in.readNormalizedNode(writer)); default: throw new IllegalArgumentException("Unhandled type " + type); } diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/ForwardingNormalizedNodeDataInput.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/ForwardingNormalizedNodeDataInput.java index 1c160698a4..a1a0d69426 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/ForwardingNormalizedNodeDataInput.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/ForwardingNormalizedNodeDataInput.java @@ -14,6 +14,7 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter; +import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter; import org.opendaylight.yangtools.yang.model.api.SchemaPath; abstract class ForwardingNormalizedNodeDataInput extends ForwardingDataInput implements NormalizedNodeDataInput { @@ -31,6 +32,12 @@ abstract class ForwardingNormalizedNodeDataInput extends ForwardingDataInput imp return delegate().readNormalizedNode(); } + @Override + public final NormalizedNode readNormalizedNode(final ReusableImmutableNormalizedNodeStreamWriter writer) + throws IOException { + return delegate().readNormalizedNode(writer); + } + @Override public final YangInstanceIdentifier readYangInstanceIdentifier() throws IOException { return delegate().readYangInstanceIdentifier(); diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeDataInput.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeDataInput.java index 9dda8bf6e4..57d41d5835 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeDataInput.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeDataInput.java @@ -19,6 +19,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNormalizedNodeStreamWriter; import org.opendaylight.yangtools.yang.data.impl.schema.NormalizedNodeResult; +import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter; import org.opendaylight.yangtools.yang.model.api.SchemaPath; /** @@ -52,6 +53,24 @@ public interface NormalizedNodeDataInput extends DataInput { return result.getResult(); } + /** + * Read a normalized node from the reader, using specified writer to construct the result. + * + * @param writer Reusable writer to + * @return Next node from the stream, or null if end of stream has been reached. + * @throws IOException if an error occurs + * @throws IllegalStateException if the dictionary has been detached + */ + default NormalizedNode readNormalizedNode(final ReusableImmutableNormalizedNodeStreamWriter writer) + throws IOException { + try { + streamNormalizedNode(writer); + return writer.getResult(); + } finally { + writer.reset(); + } + } + YangInstanceIdentifier readYangInstanceIdentifier() throws IOException; @NonNull QName readQName() throws IOException; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MergeModification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MergeModification.java index 558641948a..7f2c3f23b5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MergeModification.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MergeModification.java @@ -15,6 +15,7 @@ import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter; /** * MergeModification stores all the parameters required to merge data into the specified path. @@ -53,9 +54,9 @@ public class MergeModification extends WriteModification { return MERGE; } - public static MergeModification fromStream(final NormalizedNodeDataInput in, final short version) - throws IOException { - final NormalizedNode node = in.readNormalizedNode(); + public static MergeModification fromStream(final NormalizedNodeDataInput in, final short version, + final ReusableImmutableNormalizedNodeStreamWriter writer) throws IOException { + final NormalizedNode node = in.readNormalizedNode(writer); final YangInstanceIdentifier path = in.readYangInstanceIdentifier(); return new MergeModification(version, path, node); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java index ba05f27e0d..d4abc0452e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java @@ -23,6 +23,7 @@ import org.opendaylight.controller.cluster.datastore.node.utils.stream.Normalize import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputOutput; import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter; /** * MutableCompositeModification is just a mutable version of a CompositeModification. @@ -91,15 +92,18 @@ public class MutableCompositeModification extends VersionedExternalizableMessage int size = in.readInt(); if (size > 0) { final NormalizedNodeDataInput input = NormalizedNodeInputOutput.newDataInputWithoutValidation(in); + final ReusableImmutableNormalizedNodeStreamWriter writer = + ReusableImmutableNormalizedNodeStreamWriter.create(); + for (int i = 0; i < size; i++) { byte type = in.readByte(); switch (type) { case Modification.WRITE: - modifications.add(WriteModification.fromStream(input, getVersion())); + modifications.add(WriteModification.fromStream(input, getVersion(), writer)); break; case Modification.MERGE: - modifications.add(MergeModification.fromStream(input, getVersion())); + modifications.add(MergeModification.fromStream(input, getVersion(), writer)); break; case Modification.DELETE: diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java index da1fb70485..609503f568 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java @@ -19,6 +19,7 @@ import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter; /** * WriteModification stores all the parameters required to write data to the specified path. @@ -78,9 +79,9 @@ public class WriteModification extends AbstractModification { SerializationUtils.writeNodeAndPath(out, getPath(), data); } - public static WriteModification fromStream(final NormalizedNodeDataInput in, final short version) - throws IOException { - final NormalizedNode node = in.readNormalizedNode(); + public static WriteModification fromStream(final NormalizedNodeDataInput in, final short version, + final ReusableImmutableNormalizedNodeStreamWriter writer) throws IOException { + final NormalizedNode node = in.readNormalizedNode(writer); final YangInstanceIdentifier path = in.readYangInstanceIdentifier(); return new WriteModification(version, path, node); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java index ea5fb53255..4d38c35d48 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java @@ -23,6 +23,7 @@ import java.util.Map.Entry; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -97,9 +98,14 @@ public final class CommitTransactionPayload extends Payload implements Serializa } public Entry getCandidate() throws IOException { + return getCandidate(ReusableImmutableNormalizedNodeStreamWriter.create()); + } + + public Entry getCandidate( + final ReusableImmutableNormalizedNodeStreamWriter writer) throws IOException { final DataInput in = ByteStreams.newDataInput(serialized); return new SimpleImmutableEntry<>(TransactionIdentifier.readFrom(in), - DataTreeCandidateInputOutput.readDataTreeCandidate(in)); + DataTreeCandidateInputOutput.readDataTreeCandidate(in, writer)); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/DataTreeCandidateInputOutput.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/DataTreeCandidateInputOutput.java index 35d9998cd6..30bbca235b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/DataTreeCandidateInputOutput.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/DataTreeCandidateInputOutput.java @@ -24,6 +24,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNod import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNodes; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; +import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,10 +49,11 @@ public final class DataTreeCandidateInputOutput { } private static DataTreeCandidateNode readModifiedNode(final ModificationType type, - final NormalizedNodeDataInput in) throws IOException { + final NormalizedNodeDataInput in, final ReusableImmutableNormalizedNodeStreamWriter writer) + throws IOException { final PathArgument identifier = in.readPathArgument(); - final Collection children = readChildren(in); + final Collection children = readChildren(in, writer); if (children.isEmpty()) { LOG.debug("Modified node {} does not have any children, not instantiating it", identifier); return null; @@ -60,7 +62,8 @@ public final class DataTreeCandidateInputOutput { return ModifiedDataTreeCandidateNode.create(identifier, type, children); } - private static Collection readChildren(final NormalizedNodeDataInput in) throws IOException { + private static Collection readChildren(final NormalizedNodeDataInput in, + final ReusableImmutableNormalizedNodeStreamWriter writer) throws IOException { final int size = in.readInt(); if (size == 0) { return ImmutableList.of(); @@ -68,7 +71,7 @@ public final class DataTreeCandidateInputOutput { final Collection ret = new ArrayList<>(size); for (int i = 0; i < size; ++i) { - final DataTreeCandidateNode child = readNode(in); + final DataTreeCandidateNode child = readNode(in, writer); if (child != null) { ret.add(child); } @@ -76,27 +79,29 @@ public final class DataTreeCandidateInputOutput { return ret; } - private static DataTreeCandidateNode readNode(final NormalizedNodeDataInput in) throws IOException { + private static DataTreeCandidateNode readNode(final NormalizedNodeDataInput in, + final ReusableImmutableNormalizedNodeStreamWriter writer) throws IOException { final byte type = in.readByte(); switch (type) { case APPEARED: - return readModifiedNode(ModificationType.APPEARED, in); + return readModifiedNode(ModificationType.APPEARED, in, writer); case DELETE: return DeletedDataTreeCandidateNode.create(in.readPathArgument()); case DISAPPEARED: - return readModifiedNode(ModificationType.DISAPPEARED, in); + return readModifiedNode(ModificationType.DISAPPEARED, in, writer); case SUBTREE_MODIFIED: - return readModifiedNode(ModificationType.SUBTREE_MODIFIED, in); + return readModifiedNode(ModificationType.SUBTREE_MODIFIED, in, writer); case UNMODIFIED: return null; case WRITE: - return DataTreeCandidateNodes.written(in.readNormalizedNode()); + return DataTreeCandidateNodes.written(in.readNormalizedNode(writer)); default: throw new IllegalArgumentException("Unhandled node type " + type); } } - public static DataTreeCandidate readDataTreeCandidate(final DataInput in) throws IOException { + public static DataTreeCandidate readDataTreeCandidate(final DataInput in, + final ReusableImmutableNormalizedNodeStreamWriter writer) throws IOException { final NormalizedNodeDataInput reader = NormalizedNodeInputOutput.newDataInput(in); final YangInstanceIdentifier rootPath = reader.readYangInstanceIdentifier(); final byte type = reader.readByte(); @@ -104,20 +109,22 @@ public final class DataTreeCandidateInputOutput { final DataTreeCandidateNode rootNode; switch (type) { case APPEARED: - rootNode = ModifiedDataTreeCandidateNode.create(ModificationType.APPEARED, readChildren(reader)); + rootNode = ModifiedDataTreeCandidateNode.create(ModificationType.APPEARED, + readChildren(reader, writer)); break; case DELETE: rootNode = DeletedDataTreeCandidateNode.create(); break; case DISAPPEARED: - rootNode = ModifiedDataTreeCandidateNode.create(ModificationType.DISAPPEARED, readChildren(reader)); + rootNode = ModifiedDataTreeCandidateNode.create(ModificationType.DISAPPEARED, + readChildren(reader, writer)); break; case SUBTREE_MODIFIED: rootNode = ModifiedDataTreeCandidateNode.create(ModificationType.SUBTREE_MODIFIED, - readChildren(reader)); + readChildren(reader, writer)); break; case WRITE: - rootNode = DataTreeCandidateNodes.written(reader.readNormalizedNode()); + rootNode = DataTreeCandidateNodes.written(reader.readNormalizedNode(writer)); break; case UNMODIFIED: rootNode = AbstractDataTreeCandidateNode.createUnmodified(); -- 2.36.6