Add support for reusable streaming 17/82417/8
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 6 Jun 2019 10:26:37 +0000 (12:26 +0200)
committerRobert Varga <nite@hq.sk>
Mon, 8 Jul 2019 08:48:19 +0000 (08:48 +0000)
With the actual implementations working on top of
NormalizedNodeStreamWriter, we gained the ability to flexibly
receive stream events.

This patch takes advantage of that flexibility by allowing
a ReusableImmutableNormalizedNodeStreamWriter to be the receiver
of the events -- thus allowing parts of the state involved in
building a NormalizedNode tree to be reused -- lowering GC
pressure.

A number of call sites, which can safely reuse such state are
converted to use the newly-introduced facility.

Change-Id: Iaf1b3ab2b2996e7004c036fc93a80a8ca8792314
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionRequestProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionModification.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/ForwardingNormalizedNodeDataInput.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeDataInput.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MergeModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/DataTreeCandidateInputOutput.java

index 026f3ea..d6fc4e5 100644 (file)
@@ -22,6 +22,7 @@ import org.opendaylight.controller.cluster.datastore.node.utils.stream.Normalize
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputOutput;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeStreamVersion;
+import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter;
 
 /**
  * Externalizable proxy for use with {@link ExistsTransactionRequest}. It implements the initial (Boron) serialization
@@ -59,8 +60,10 @@ final class ModifyTransactionRequestProxyV1 extends AbstractTransactionRequestPr
         if (size != 0) {
             modifications = new ArrayList<>(size);
             final NormalizedNodeDataInput nnin = NormalizedNodeInputOutput.newDataInput(in);
+            final ReusableImmutableNormalizedNodeStreamWriter writer =
+                    ReusableImmutableNormalizedNodeStreamWriter.create();
             for (int i = 0; i < size; ++i) {
-                modifications.add(TransactionModification.readFrom(nnin));
+                modifications.add(TransactionModification.readFrom(nnin, writer));
             }
         } else {
             modifications = ImmutableList.of();
index d711422..dcd05c3 100644 (file)
@@ -14,10 +14,12 @@ import java.io.IOException;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter;
 
 /**
  * An individual modification of a transaction's state. This class and its subclasses are not serializable, but rather
- * expose {@link #writeTo(NormalizedNodeDataOutput)} and {@link #readFrom(NormalizedNodeDataInput)} methods for explicit
+ * expose {@link #writeTo(NormalizedNodeDataOutput)} and
+ * {@link #readFrom(NormalizedNodeDataInput, ReusableImmutableNormalizedNodeStreamWriter)} methods for explicit
  * serialization. The reason for this is that they are usually transmitted in bulk, hence it is advantageous to reuse
  * a {@link NormalizedNodeDataOutput} instance to achieve better compression.
  *
@@ -51,15 +53,16 @@ public abstract class TransactionModification {
         out.writeYangInstanceIdentifier(path);
     }
 
-    static TransactionModification readFrom(final NormalizedNodeDataInput in) throws IOException {
+    static TransactionModification readFrom(final NormalizedNodeDataInput in,
+            final ReusableImmutableNormalizedNodeStreamWriter writer) throws IOException {
         final byte type = in.readByte();
         switch (type) {
             case TYPE_DELETE:
                 return new TransactionDelete(in.readYangInstanceIdentifier());
             case TYPE_MERGE:
-                return new TransactionMerge(in.readYangInstanceIdentifier(), in.readNormalizedNode());
+                return new TransactionMerge(in.readYangInstanceIdentifier(), in.readNormalizedNode(writer));
             case TYPE_WRITE:
-                return new TransactionWrite(in.readYangInstanceIdentifier(), in.readNormalizedNode());
+                return new TransactionWrite(in.readYangInstanceIdentifier(), in.readNormalizedNode(writer));
             default:
                 throw new IllegalArgumentException("Unhandled type " + type);
         }
index 1c16069..a1a0d69 100644 (file)
@@ -14,6 +14,7 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
+import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 
 abstract class ForwardingNormalizedNodeDataInput extends ForwardingDataInput implements NormalizedNodeDataInput {
@@ -31,6 +32,12 @@ abstract class ForwardingNormalizedNodeDataInput extends ForwardingDataInput imp
         return delegate().readNormalizedNode();
     }
 
+    @Override
+    public final NormalizedNode<?, ?> readNormalizedNode(final ReusableImmutableNormalizedNodeStreamWriter writer)
+            throws IOException {
+        return delegate().readNormalizedNode(writer);
+    }
+
     @Override
     public final YangInstanceIdentifier readYangInstanceIdentifier() throws IOException {
         return delegate().readYangInstanceIdentifier();
index 9dda8bf..57d41d5 100644 (file)
@@ -19,6 +19,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNormalizedNodeStreamWriter;
 import org.opendaylight.yangtools.yang.data.impl.schema.NormalizedNodeResult;
+import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 
 /**
@@ -52,6 +53,24 @@ public interface NormalizedNodeDataInput extends DataInput {
         return result.getResult();
     }
 
+    /**
+     * Read a normalized node from the reader, using specified writer to construct the result.
+     *
+     * @param writer Reusable writer to
+     * @return Next node from the stream, or null if end of stream has been reached.
+     * @throws IOException if an error occurs
+     * @throws IllegalStateException if the dictionary has been detached
+     */
+    default NormalizedNode<?, ?> readNormalizedNode(final ReusableImmutableNormalizedNodeStreamWriter writer)
+            throws IOException {
+        try {
+            streamNormalizedNode(writer);
+            return writer.getResult();
+        } finally {
+            writer.reset();
+        }
+    }
+
     YangInstanceIdentifier readYangInstanceIdentifier() throws IOException;
 
     @NonNull QName readQName() throws IOException;
index 5586419..7f2c3f2 100644 (file)
@@ -15,6 +15,7 @@ import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter;
 
 /**
  * MergeModification stores all the parameters required to merge data into the specified path.
@@ -53,9 +54,9 @@ public class MergeModification extends WriteModification {
         return MERGE;
     }
 
-    public static MergeModification fromStream(final NormalizedNodeDataInput in, final short version)
-            throws IOException {
-        final NormalizedNode<?, ?> node = in.readNormalizedNode();
+    public static MergeModification fromStream(final NormalizedNodeDataInput in, final short version,
+            final ReusableImmutableNormalizedNodeStreamWriter writer) throws IOException {
+        final NormalizedNode<?, ?> node = in.readNormalizedNode(writer);
         final YangInstanceIdentifier path = in.readYangInstanceIdentifier();
         return new MergeModification(version, path, node);
     }
index ba05f27..d4abc04 100644 (file)
@@ -23,6 +23,7 @@ import org.opendaylight.controller.cluster.datastore.node.utils.stream.Normalize
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputOutput;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter;
 
 /**
  * MutableCompositeModification is just a mutable version of a CompositeModification.
@@ -91,15 +92,18 @@ public class MutableCompositeModification extends VersionedExternalizableMessage
         int size = in.readInt();
         if (size > 0) {
             final NormalizedNodeDataInput input = NormalizedNodeInputOutput.newDataInputWithoutValidation(in);
+            final ReusableImmutableNormalizedNodeStreamWriter writer =
+                    ReusableImmutableNormalizedNodeStreamWriter.create();
+
             for (int i = 0; i < size; i++) {
                 byte type = in.readByte();
                 switch (type) {
                     case Modification.WRITE:
-                        modifications.add(WriteModification.fromStream(input, getVersion()));
+                        modifications.add(WriteModification.fromStream(input, getVersion(), writer));
                         break;
 
                     case Modification.MERGE:
-                        modifications.add(MergeModification.fromStream(input, getVersion()));
+                        modifications.add(MergeModification.fromStream(input, getVersion(), writer));
                         break;
 
                     case Modification.DELETE:
index da1fb70..609503f 100644 (file)
@@ -19,6 +19,7 @@ import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter;
 
 /**
  * WriteModification stores all the parameters required to write data to the specified path.
@@ -78,9 +79,9 @@ public class WriteModification extends AbstractModification {
         SerializationUtils.writeNodeAndPath(out, getPath(), data);
     }
 
-    public static WriteModification fromStream(final NormalizedNodeDataInput in, final short version)
-            throws IOException {
-        final NormalizedNode<?, ?> node = in.readNormalizedNode();
+    public static WriteModification fromStream(final NormalizedNodeDataInput in, final short version,
+            final ReusableImmutableNormalizedNodeStreamWriter writer) throws IOException {
+        final NormalizedNode<?, ?> node = in.readNormalizedNode(writer);
         final YangInstanceIdentifier path = in.readYangInstanceIdentifier();
         return new WriteModification(version, path, node);
     }
index ea5fb53..4d38c35 100644 (file)
@@ -23,6 +23,7 @@ import java.util.Map.Entry;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -97,9 +98,14 @@ public final class CommitTransactionPayload extends Payload implements Serializa
     }
 
     public Entry<TransactionIdentifier, DataTreeCandidate> getCandidate() throws IOException {
+        return getCandidate(ReusableImmutableNormalizedNodeStreamWriter.create());
+    }
+
+    public Entry<TransactionIdentifier, DataTreeCandidate> getCandidate(
+            final ReusableImmutableNormalizedNodeStreamWriter writer) throws IOException {
         final DataInput in = ByteStreams.newDataInput(serialized);
         return new SimpleImmutableEntry<>(TransactionIdentifier.readFrom(in),
-                DataTreeCandidateInputOutput.readDataTreeCandidate(in));
+                DataTreeCandidateInputOutput.readDataTreeCandidate(in, writer));
     }
 
     @Override
index 35d9998..30bbca2 100644 (file)
@@ -24,6 +24,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNod
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNodes;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,10 +49,11 @@ public final class DataTreeCandidateInputOutput {
     }
 
     private static DataTreeCandidateNode readModifiedNode(final ModificationType type,
-            final NormalizedNodeDataInput in) throws IOException {
+            final NormalizedNodeDataInput in, final ReusableImmutableNormalizedNodeStreamWriter writer)
+                    throws IOException {
 
         final PathArgument identifier = in.readPathArgument();
-        final Collection<DataTreeCandidateNode> children = readChildren(in);
+        final Collection<DataTreeCandidateNode> children = readChildren(in, writer);
         if (children.isEmpty()) {
             LOG.debug("Modified node {} does not have any children, not instantiating it", identifier);
             return null;
@@ -60,7 +62,8 @@ public final class DataTreeCandidateInputOutput {
         return ModifiedDataTreeCandidateNode.create(identifier, type, children);
     }
 
-    private static Collection<DataTreeCandidateNode> readChildren(final NormalizedNodeDataInput in) throws IOException {
+    private static Collection<DataTreeCandidateNode> readChildren(final NormalizedNodeDataInput in,
+            final ReusableImmutableNormalizedNodeStreamWriter writer) throws IOException {
         final int size = in.readInt();
         if (size == 0) {
             return ImmutableList.of();
@@ -68,7 +71,7 @@ public final class DataTreeCandidateInputOutput {
 
         final Collection<DataTreeCandidateNode> ret = new ArrayList<>(size);
         for (int i = 0; i < size; ++i) {
-            final DataTreeCandidateNode child = readNode(in);
+            final DataTreeCandidateNode child = readNode(in, writer);
             if (child != null) {
                 ret.add(child);
             }
@@ -76,27 +79,29 @@ public final class DataTreeCandidateInputOutput {
         return ret;
     }
 
-    private static DataTreeCandidateNode readNode(final NormalizedNodeDataInput in) throws IOException {
+    private static DataTreeCandidateNode readNode(final NormalizedNodeDataInput in,
+            final ReusableImmutableNormalizedNodeStreamWriter writer) throws IOException {
         final byte type = in.readByte();
         switch (type) {
             case APPEARED:
-                return readModifiedNode(ModificationType.APPEARED, in);
+                return readModifiedNode(ModificationType.APPEARED, in, writer);
             case DELETE:
                 return DeletedDataTreeCandidateNode.create(in.readPathArgument());
             case DISAPPEARED:
-                return readModifiedNode(ModificationType.DISAPPEARED, in);
+                return readModifiedNode(ModificationType.DISAPPEARED, in, writer);
             case SUBTREE_MODIFIED:
-                return readModifiedNode(ModificationType.SUBTREE_MODIFIED, in);
+                return readModifiedNode(ModificationType.SUBTREE_MODIFIED, in, writer);
             case UNMODIFIED:
                 return null;
             case WRITE:
-                return DataTreeCandidateNodes.written(in.readNormalizedNode());
+                return DataTreeCandidateNodes.written(in.readNormalizedNode(writer));
             default:
                 throw new IllegalArgumentException("Unhandled node type " + type);
         }
     }
 
-    public static DataTreeCandidate readDataTreeCandidate(final DataInput in) throws IOException {
+    public static DataTreeCandidate readDataTreeCandidate(final DataInput in,
+            final ReusableImmutableNormalizedNodeStreamWriter writer) throws IOException {
         final NormalizedNodeDataInput reader = NormalizedNodeInputOutput.newDataInput(in);
         final YangInstanceIdentifier rootPath = reader.readYangInstanceIdentifier();
         final byte type = reader.readByte();
@@ -104,20 +109,22 @@ public final class DataTreeCandidateInputOutput {
         final DataTreeCandidateNode rootNode;
         switch (type) {
             case APPEARED:
-                rootNode = ModifiedDataTreeCandidateNode.create(ModificationType.APPEARED, readChildren(reader));
+                rootNode = ModifiedDataTreeCandidateNode.create(ModificationType.APPEARED,
+                    readChildren(reader, writer));
                 break;
             case DELETE:
                 rootNode = DeletedDataTreeCandidateNode.create();
                 break;
             case DISAPPEARED:
-                rootNode = ModifiedDataTreeCandidateNode.create(ModificationType.DISAPPEARED, readChildren(reader));
+                rootNode = ModifiedDataTreeCandidateNode.create(ModificationType.DISAPPEARED,
+                    readChildren(reader, writer));
                 break;
             case SUBTREE_MODIFIED:
                 rootNode = ModifiedDataTreeCandidateNode.create(ModificationType.SUBTREE_MODIFIED,
-                        readChildren(reader));
+                        readChildren(reader, writer));
                 break;
             case WRITE:
-                rootNode = DataTreeCandidateNodes.written(reader.readNormalizedNode());
+                rootNode = DataTreeCandidateNodes.written(reader.readNormalizedNode(writer));
                 break;
             case UNMODIFIED:
                 rootNode = AbstractDataTreeCandidateNode.createUnmodified();

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.