Switch CompositeModification to bypass thread-local streams 88/82388/5
authorRobert Varga <robert.varga@pantheon.tech>
Mon, 3 Jun 2019 17:43:22 +0000 (19:43 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Tue, 4 Jun 2019 00:19:35 +0000 (02:19 +0200)
Thread-local streams have little control over the version being
used, as the stream is allocated all over the place.

This model cannot function without explicit setup, which is used
by CompositeModification. This patch migrates CompositeModification
to use a new NormalizedNodeDataInput-based interface, so
thread-locals become obsolete.

JIRA: CONTROLLER-1888
Change-Id: I1111be5d6fc8c0129fb72232c3e713004068f152
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/AbstractModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/DeleteModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MergeModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/Modification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java

index c81468d6617a9a2302a17a2218c9e613764ebe2a..33bd4d45e191419d605153463eafd2e00987830e 100644 (file)
@@ -18,15 +18,20 @@ public abstract class AbstractModification implements Modification {
     private YangInstanceIdentifier path;
     private short version;
 
-    protected AbstractModification(short version) {
+    protected AbstractModification(final short version) {
         this.version = version;
     }
 
-    protected AbstractModification(YangInstanceIdentifier path) {
+    protected AbstractModification(final short version, final YangInstanceIdentifier path) {
+        this.path = path;
+        this.version = version;
+    }
+
+    protected AbstractModification(final YangInstanceIdentifier path) {
         this.path = path;
     }
 
-    protected void setPath(YangInstanceIdentifier path) {
+    protected void setPath(final YangInstanceIdentifier path) {
         this.path = path;
     }
 
index 347cde9f3fed6f727a96350e386d952a4bd513a9..ce4108dc7f7b83feea604bbc8b788fd7e44b2899 100644 (file)
@@ -8,9 +8,12 @@
 
 package org.opendaylight.controller.cluster.datastore.modification;
 
+import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -26,21 +29,25 @@ public class DeleteModification extends AbstractModification {
         this(DataStoreVersions.CURRENT_VERSION);
     }
 
-    public DeleteModification(short version) {
+    public DeleteModification(final short version) {
         super(version);
     }
 
-    public DeleteModification(YangInstanceIdentifier path) {
+    public DeleteModification(final YangInstanceIdentifier path) {
         super(path);
     }
 
+    DeleteModification(final short version, final YangInstanceIdentifier path) {
+        super(version, path);
+    }
+
     @Override
-    public void apply(DOMStoreWriteTransaction transaction) {
+    public void apply(final DOMStoreWriteTransaction transaction) {
         transaction.delete(getPath());
     }
 
     @Override
-    public void apply(DataTreeModification transaction) {
+    public void apply(final DataTreeModification transaction) {
         transaction.delete(getPath());
     }
 
@@ -50,18 +57,22 @@ public class DeleteModification extends AbstractModification {
     }
 
     @Override
-    public void readExternal(ObjectInput in) {
+    public void readExternal(final ObjectInput in) {
         setPath(SerializationUtils.deserializePath(in));
     }
 
     @Override
-    public void writeExternal(ObjectOutput out) {
+    public void writeExternal(final ObjectOutput out) {
         SerializationUtils.serializePath(getPath(), out);
     }
 
-    public static DeleteModification fromStream(ObjectInput in, short version) {
-        DeleteModification mod = new DeleteModification(version);
-        mod.readExternal(in);
-        return mod;
+    @Override
+    public void writeTo(final NormalizedNodeDataOutput out) throws IOException {
+        out.writeYangInstanceIdentifier(getPath());
+    }
+
+    public static DeleteModification fromStream(final NormalizedNodeDataInput in, final short version)
+            throws IOException {
+        return new DeleteModification(version, in.readYangInstanceIdentifier());
     }
 }
index 465372736bb398769d811fd217319a1f1b694328..558641948a3101983b34d610e05a44cc622867bf 100644 (file)
@@ -8,8 +8,9 @@
 
 package org.opendaylight.controller.cluster.datastore.modification;
 
-import java.io.ObjectInput;
+import java.io.IOException;
 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -25,7 +26,7 @@ public class MergeModification extends WriteModification {
         this(DataStoreVersions.CURRENT_VERSION);
     }
 
-    public MergeModification(short version) {
+    public MergeModification(final short version) {
         super(version);
     }
 
@@ -33,6 +34,10 @@ public class MergeModification extends WriteModification {
         super(path, data);
     }
 
+    MergeModification(final short version, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        super(version, path, data);
+    }
+
     @Override
     public void apply(final DOMStoreWriteTransaction transaction) {
         transaction.merge(getPath(), getData());
@@ -48,9 +53,10 @@ public class MergeModification extends WriteModification {
         return MERGE;
     }
 
-    public static MergeModification fromStream(ObjectInput in, short version) {
-        MergeModification mod = new MergeModification(version);
-        mod.readExternal(in);
-        return mod;
+    public static MergeModification fromStream(final NormalizedNodeDataInput in, final short version)
+            throws IOException {
+        final NormalizedNode<?, ?> node = in.readNormalizedNode();
+        final YangInstanceIdentifier path = in.readYangInstanceIdentifier();
+        return new MergeModification(version, path, node);
     }
 }
index b11357e61ab13377e34bd80c1712c3d01fef26c0..eddfba2ce1ac4d173eb5fa4a8af4f9dc4f432e4e 100644 (file)
@@ -12,6 +12,7 @@ import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 
@@ -59,4 +60,6 @@ public interface Modification extends Externalizable {
 
     @Override
     void readExternal(ObjectInput in) throws IOException, ClassNotFoundException;
+
+    void writeTo(NormalizedNodeDataOutput out) throws IOException;
 }
index 485bb42a94c5d0fc1bf70e66b58ce8f3444d51b5..79e3b149617bfd4a2ecebb7c245a949435ecaf00 100644 (file)
@@ -17,8 +17,9 @@ import java.util.Collections;
 import java.util.List;
 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.messages.VersionedExternalizableMessage;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputOutput;
-import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 
@@ -35,19 +36,19 @@ public class MutableCompositeModification extends VersionedExternalizableMessage
         this(DataStoreVersions.CURRENT_VERSION);
     }
 
-    public MutableCompositeModification(short version) {
+    public MutableCompositeModification(final short version) {
         super(version);
     }
 
     @Override
-    public void apply(DOMStoreWriteTransaction transaction) {
+    public void apply(final DOMStoreWriteTransaction transaction) {
         for (Modification modification : modifications) {
             modification.apply(transaction);
         }
     }
 
     @Override
-    public void apply(DataTreeModification transaction) {
+    public void apply(final DataTreeModification transaction) {
         for (Modification modification : modifications) {
             modification.apply(transaction);
         }
@@ -63,12 +64,12 @@ public class MutableCompositeModification extends VersionedExternalizableMessage
      *
      * @param modification the modification to add.
      */
-    public void addModification(Modification modification) {
+    public void addModification(final Modification modification) {
         Preconditions.checkNotNull(modification);
         modifications.add(modification);
     }
 
-    public void addModifications(Iterable<Modification> newMods) {
+    public void addModifications(final Iterable<Modification> newMods) {
         for (Modification mod : newMods) {
             addModification(mod);
         }
@@ -84,61 +85,56 @@ public class MutableCompositeModification extends VersionedExternalizableMessage
     }
 
     @Override
-    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+    public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
         super.readExternal(in);
 
         int size = in.readInt();
-
-        if (size > 1) {
-            SerializationUtils.REUSABLE_READER_TL.set(NormalizedNodeInputOutput.newDataInputWithoutValidation(in));
-        }
-
-        try {
+        if (size > 0) {
+            final NormalizedNodeDataInput input = NormalizedNodeInputOutput.newDataInputWithoutValidation(in);
             for (int i = 0; i < size; i++) {
                 byte type = in.readByte();
                 switch (type) {
                     case Modification.WRITE:
-                        modifications.add(WriteModification.fromStream(in, getVersion()));
+                        modifications.add(WriteModification.fromStream(input, getVersion()));
                         break;
 
                     case Modification.MERGE:
-                        modifications.add(MergeModification.fromStream(in, getVersion()));
+                        modifications.add(MergeModification.fromStream(input, getVersion()));
                         break;
 
                     case Modification.DELETE:
-                        modifications.add(DeleteModification.fromStream(in, getVersion()));
+                        modifications.add(DeleteModification.fromStream(input, getVersion()));
                         break;
                     default:
                         break;
                 }
             }
-        } finally {
-            SerializationUtils.REUSABLE_READER_TL.remove();
         }
     }
 
     @Override
-    public void writeExternal(ObjectOutput out) throws IOException {
+    public void writeExternal(final ObjectOutput out) throws IOException {
         super.writeExternal(out);
 
-        out.writeInt(modifications.size());
-
-        if (modifications.size() > 1) {
-            SerializationUtils.REUSABLE_WRITER_TL.set(NormalizedNodeInputOutput.newDataOutput(out));
-        }
-
-        try {
-            for (Modification mod: modifications) {
-                out.writeByte(mod.getType());
-                mod.writeExternal(out);
+        final int size = modifications.size();
+        out.writeInt(size);
+        if (size > 0) {
+            try (NormalizedNodeDataOutput stream = NormalizedNodeInputOutput.newDataOutput(out)) {
+                for (Modification mod : modifications) {
+                    out.writeByte(mod.getType());
+                    mod.writeTo(stream);
+                }
             }
-        } finally {
-            SerializationUtils.REUSABLE_WRITER_TL.remove();
         }
     }
 
-    public static MutableCompositeModification fromSerializable(Object serializable) {
+    public static MutableCompositeModification fromSerializable(final Object serializable) {
         Preconditions.checkArgument(serializable instanceof MutableCompositeModification);
         return (MutableCompositeModification)serializable;
     }
+
+    @Override
+    public void writeTo(final NormalizedNodeDataOutput out) throws IOException {
+        throw new UnsupportedOperationException();
+    }
 }
index 70125e29e52a063f9e5d0e8cbe2db30d79f5d0bb..63fa74069a5fe5d310e481276ae83eaaf90d5717 100644 (file)
@@ -8,9 +8,12 @@
 
 package org.opendaylight.controller.cluster.datastore.modification;
 
+import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils.Applier;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
@@ -30,10 +33,15 @@ public class WriteModification extends AbstractModification {
         this(DataStoreVersions.CURRENT_VERSION);
     }
 
-    public WriteModification(short version) {
+    public WriteModification(final short version) {
         super(version);
     }
 
+    WriteModification(final short version, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        super(version, path);
+        this.data = data;
+    }
+
     public WriteModification(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
         super(path);
         this.data = data;
@@ -59,19 +67,26 @@ public class WriteModification extends AbstractModification {
     }
 
     @Override
-    public void readExternal(ObjectInput in) {
+    public void readExternal(final ObjectInput in) {
         SerializationUtils.deserializePathAndNode(in, this, APPLIER);
     }
 
     @Override
-    public void writeExternal(ObjectOutput out) {
+    public void writeExternal(final ObjectOutput out) {
         SerializationUtils.serializePathAndNode(getPath(), data, out);
     }
 
-    public static WriteModification fromStream(ObjectInput in, short version) {
-        WriteModification mod = new WriteModification(version);
-        mod.readExternal(in);
-        return mod;
+    public static WriteModification fromStream(final NormalizedNodeDataInput in, final short version)
+            throws IOException {
+        final NormalizedNode<?, ?> node = in.readNormalizedNode();
+        final YangInstanceIdentifier path = in.readYangInstanceIdentifier();
+        return new WriteModification(version, path, node);
+    }
+
+    public void writeTo(final NormalizedNodeDataOutput out) throws IOException {
+        // FIXME: this should be inverted, as the path helps receivers in establishment of context
+        out.writeNormalizedNode(data);
+        out.writeYangInstanceIdentifier(getPath());
     }
 
     private static final Applier<WriteModification> APPLIER = (instance, path, node) -> {