Switch CompositeModification to bypass thread-local streams 88/82388/5
authorRobert Varga <robert.varga@pantheon.tech>
Mon, 3 Jun 2019 17:43:22 +0000 (19:43 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Tue, 4 Jun 2019 00:19:35 +0000 (02:19 +0200)
Thread-local streams have little control over the version being
used, as the stream is allocated all over the place.

This model cannot function without explicit setup, which is used
by CompositeModification. This patch migrates CompositeModification
to use a new NormalizedNodeDataInput-based interface, so
thread-locals become obsolete.

JIRA: CONTROLLER-1888
Change-Id: I1111be5d6fc8c0129fb72232c3e713004068f152
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/AbstractModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/DeleteModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MergeModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/Modification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java

index c81468d..33bd4d4 100644 (file)
@@ -18,15 +18,20 @@ public abstract class AbstractModification implements Modification {
     private YangInstanceIdentifier path;
     private short version;
 
-    protected AbstractModification(short version) {
+    protected AbstractModification(final short version) {
         this.version = version;
     }
 
-    protected AbstractModification(YangInstanceIdentifier path) {
+    protected AbstractModification(final short version, final YangInstanceIdentifier path) {
+        this.path = path;
+        this.version = version;
+    }
+
+    protected AbstractModification(final YangInstanceIdentifier path) {
         this.path = path;
     }
 
-    protected void setPath(YangInstanceIdentifier path) {
+    protected void setPath(final YangInstanceIdentifier path) {
         this.path = path;
     }
 
index 347cde9..ce4108d 100644 (file)
@@ -8,9 +8,12 @@
 
 package org.opendaylight.controller.cluster.datastore.modification;
 
+import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -26,21 +29,25 @@ public class DeleteModification extends AbstractModification {
         this(DataStoreVersions.CURRENT_VERSION);
     }
 
-    public DeleteModification(short version) {
+    public DeleteModification(final short version) {
         super(version);
     }
 
-    public DeleteModification(YangInstanceIdentifier path) {
+    public DeleteModification(final YangInstanceIdentifier path) {
         super(path);
     }
 
+    DeleteModification(final short version, final YangInstanceIdentifier path) {
+        super(version, path);
+    }
+
     @Override
-    public void apply(DOMStoreWriteTransaction transaction) {
+    public void apply(final DOMStoreWriteTransaction transaction) {
         transaction.delete(getPath());
     }
 
     @Override
-    public void apply(DataTreeModification transaction) {
+    public void apply(final DataTreeModification transaction) {
         transaction.delete(getPath());
     }
 
@@ -50,18 +57,22 @@ public class DeleteModification extends AbstractModification {
     }
 
     @Override
-    public void readExternal(ObjectInput in) {
+    public void readExternal(final ObjectInput in) {
         setPath(SerializationUtils.deserializePath(in));
     }
 
     @Override
-    public void writeExternal(ObjectOutput out) {
+    public void writeExternal(final ObjectOutput out) {
         SerializationUtils.serializePath(getPath(), out);
     }
 
-    public static DeleteModification fromStream(ObjectInput in, short version) {
-        DeleteModification mod = new DeleteModification(version);
-        mod.readExternal(in);
-        return mod;
+    @Override
+    public void writeTo(final NormalizedNodeDataOutput out) throws IOException {
+        out.writeYangInstanceIdentifier(getPath());
+    }
+
+    public static DeleteModification fromStream(final NormalizedNodeDataInput in, final short version)
+            throws IOException {
+        return new DeleteModification(version, in.readYangInstanceIdentifier());
     }
 }
index 4653727..5586419 100644 (file)
@@ -8,8 +8,9 @@
 
 package org.opendaylight.controller.cluster.datastore.modification;
 
-import java.io.ObjectInput;
+import java.io.IOException;
 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -25,7 +26,7 @@ public class MergeModification extends WriteModification {
         this(DataStoreVersions.CURRENT_VERSION);
     }
 
-    public MergeModification(short version) {
+    public MergeModification(final short version) {
         super(version);
     }
 
@@ -33,6 +34,10 @@ public class MergeModification extends WriteModification {
         super(path, data);
     }
 
+    MergeModification(final short version, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        super(version, path, data);
+    }
+
     @Override
     public void apply(final DOMStoreWriteTransaction transaction) {
         transaction.merge(getPath(), getData());
@@ -48,9 +53,10 @@ public class MergeModification extends WriteModification {
         return MERGE;
     }
 
-    public static MergeModification fromStream(ObjectInput in, short version) {
-        MergeModification mod = new MergeModification(version);
-        mod.readExternal(in);
-        return mod;
+    public static MergeModification fromStream(final NormalizedNodeDataInput in, final short version)
+            throws IOException {
+        final NormalizedNode<?, ?> node = in.readNormalizedNode();
+        final YangInstanceIdentifier path = in.readYangInstanceIdentifier();
+        return new MergeModification(version, path, node);
     }
 }
index b11357e..eddfba2 100644 (file)
@@ -12,6 +12,7 @@ import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 
@@ -59,4 +60,6 @@ public interface Modification extends Externalizable {
 
     @Override
     void readExternal(ObjectInput in) throws IOException, ClassNotFoundException;
+
+    void writeTo(NormalizedNodeDataOutput out) throws IOException;
 }
index 485bb42..79e3b14 100644 (file)
@@ -17,8 +17,9 @@ import java.util.Collections;
 import java.util.List;
 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.messages.VersionedExternalizableMessage;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputOutput;
-import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 
@@ -35,19 +36,19 @@ public class MutableCompositeModification extends VersionedExternalizableMessage
         this(DataStoreVersions.CURRENT_VERSION);
     }
 
-    public MutableCompositeModification(short version) {
+    public MutableCompositeModification(final short version) {
         super(version);
     }
 
     @Override
-    public void apply(DOMStoreWriteTransaction transaction) {
+    public void apply(final DOMStoreWriteTransaction transaction) {
         for (Modification modification : modifications) {
             modification.apply(transaction);
         }
     }
 
     @Override
-    public void apply(DataTreeModification transaction) {
+    public void apply(final DataTreeModification transaction) {
         for (Modification modification : modifications) {
             modification.apply(transaction);
         }
@@ -63,12 +64,12 @@ public class MutableCompositeModification extends VersionedExternalizableMessage
      *
      * @param modification the modification to add.
      */
-    public void addModification(Modification modification) {
+    public void addModification(final Modification modification) {
         Preconditions.checkNotNull(modification);
         modifications.add(modification);
     }
 
-    public void addModifications(Iterable<Modification> newMods) {
+    public void addModifications(final Iterable<Modification> newMods) {
         for (Modification mod : newMods) {
             addModification(mod);
         }
@@ -84,61 +85,56 @@ public class MutableCompositeModification extends VersionedExternalizableMessage
     }
 
     @Override
-    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+    public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
         super.readExternal(in);
 
         int size = in.readInt();
-
-        if (size > 1) {
-            SerializationUtils.REUSABLE_READER_TL.set(NormalizedNodeInputOutput.newDataInputWithoutValidation(in));
-        }
-
-        try {
+        if (size > 0) {
+            final NormalizedNodeDataInput input = NormalizedNodeInputOutput.newDataInputWithoutValidation(in);
             for (int i = 0; i < size; i++) {
                 byte type = in.readByte();
                 switch (type) {
                     case Modification.WRITE:
-                        modifications.add(WriteModification.fromStream(in, getVersion()));
+                        modifications.add(WriteModification.fromStream(input, getVersion()));
                         break;
 
                     case Modification.MERGE:
-                        modifications.add(MergeModification.fromStream(in, getVersion()));
+                        modifications.add(MergeModification.fromStream(input, getVersion()));
                         break;
 
                     case Modification.DELETE:
-                        modifications.add(DeleteModification.fromStream(in, getVersion()));
+                        modifications.add(DeleteModification.fromStream(input, getVersion()));
                         break;
                     default:
                         break;
                 }
             }
-        } finally {
-            SerializationUtils.REUSABLE_READER_TL.remove();
         }
     }
 
     @Override
-    public void writeExternal(ObjectOutput out) throws IOException {
+    public void writeExternal(final ObjectOutput out) throws IOException {
         super.writeExternal(out);
 
-        out.writeInt(modifications.size());
-
-        if (modifications.size() > 1) {
-            SerializationUtils.REUSABLE_WRITER_TL.set(NormalizedNodeInputOutput.newDataOutput(out));
-        }
-
-        try {
-            for (Modification mod: modifications) {
-                out.writeByte(mod.getType());
-                mod.writeExternal(out);
+        final int size = modifications.size();
+        out.writeInt(size);
+        if (size > 0) {
+            try (NormalizedNodeDataOutput stream = NormalizedNodeInputOutput.newDataOutput(out)) {
+                for (Modification mod : modifications) {
+                    out.writeByte(mod.getType());
+                    mod.writeTo(stream);
+                }
             }
-        } finally {
-            SerializationUtils.REUSABLE_WRITER_TL.remove();
         }
     }
 
-    public static MutableCompositeModification fromSerializable(Object serializable) {
+    public static MutableCompositeModification fromSerializable(final Object serializable) {
         Preconditions.checkArgument(serializable instanceof MutableCompositeModification);
         return (MutableCompositeModification)serializable;
     }
+
+    @Override
+    public void writeTo(final NormalizedNodeDataOutput out) throws IOException {
+        throw new UnsupportedOperationException();
+    }
 }
index 70125e2..63fa740 100644 (file)
@@ -8,9 +8,12 @@
 
 package org.opendaylight.controller.cluster.datastore.modification;
 
+import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils.Applier;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
@@ -30,10 +33,15 @@ public class WriteModification extends AbstractModification {
         this(DataStoreVersions.CURRENT_VERSION);
     }
 
-    public WriteModification(short version) {
+    public WriteModification(final short version) {
         super(version);
     }
 
+    WriteModification(final short version, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        super(version, path);
+        this.data = data;
+    }
+
     public WriteModification(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
         super(path);
         this.data = data;
@@ -59,19 +67,26 @@ public class WriteModification extends AbstractModification {
     }
 
     @Override
-    public void readExternal(ObjectInput in) {
+    public void readExternal(final ObjectInput in) {
         SerializationUtils.deserializePathAndNode(in, this, APPLIER);
     }
 
     @Override
-    public void writeExternal(ObjectOutput out) {
+    public void writeExternal(final ObjectOutput out) {
         SerializationUtils.serializePathAndNode(getPath(), data, out);
     }
 
-    public static WriteModification fromStream(ObjectInput in, short version) {
-        WriteModification mod = new WriteModification(version);
-        mod.readExternal(in);
-        return mod;
+    public static WriteModification fromStream(final NormalizedNodeDataInput in, final short version)
+            throws IOException {
+        final NormalizedNode<?, ?> node = in.readNormalizedNode();
+        final YangInstanceIdentifier path = in.readYangInstanceIdentifier();
+        return new WriteModification(version, path, node);
+    }
+
+    public void writeTo(final NormalizedNodeDataOutput out) throws IOException {
+        // FIXME: this should be inverted, as the path helps receivers in establishment of context
+        out.writeNormalizedNode(data);
+        out.writeYangInstanceIdentifier(getPath());
     }
 
     private static final Applier<WriteModification> APPLIER = (instance, path, node) -> {

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.