From: Robert Varga Date: Mon, 3 Jun 2019 17:43:22 +0000 (+0200) Subject: Switch CompositeModification to bypass thread-local streams X-Git-Tag: release/sodium~65 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=b12758e87e3da8b71b574614f8829e5fcbce11e8;hp=35dfbc096623d860b6e24b2d17ac6e2c6a3dca1c Switch CompositeModification to bypass thread-local streams Thread-local streams have little control over the version being used, as the stream is allocated all over the place. This model cannot function without explicit setup, which is used by CompositeModification. This patch migrates CompositeModification to use a new NormalizedNodeDataInput-based interface, so thread-locals become obsolete. JIRA: CONTROLLER-1888 Change-Id: I1111be5d6fc8c0129fb72232c3e713004068f152 Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/AbstractModification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/AbstractModification.java index c81468d661..33bd4d45e1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/AbstractModification.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/AbstractModification.java @@ -18,15 +18,20 @@ public abstract class AbstractModification implements Modification { private YangInstanceIdentifier path; private short version; - protected AbstractModification(short version) { + protected AbstractModification(final short version) { this.version = version; } - protected AbstractModification(YangInstanceIdentifier path) { + protected AbstractModification(final short version, final YangInstanceIdentifier path) { + this.path = path; + this.version = version; + } + + protected AbstractModification(final YangInstanceIdentifier path) { this.path = path; } - protected void setPath(YangInstanceIdentifier path) { + protected void setPath(final YangInstanceIdentifier path) { this.path = path; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/DeleteModification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/DeleteModification.java index 347cde9f3f..ce4108dc7f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/DeleteModification.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/DeleteModification.java @@ -8,9 +8,12 @@ package org.opendaylight.controller.cluster.datastore.modification; +import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import org.opendaylight.controller.cluster.datastore.DataStoreVersions; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput; import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils; import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -26,21 +29,25 @@ public class DeleteModification extends AbstractModification { this(DataStoreVersions.CURRENT_VERSION); } - public DeleteModification(short version) { + public DeleteModification(final short version) { super(version); } - public DeleteModification(YangInstanceIdentifier path) { + public DeleteModification(final YangInstanceIdentifier path) { super(path); } + DeleteModification(final short version, final YangInstanceIdentifier path) { + super(version, path); + } + @Override - public void apply(DOMStoreWriteTransaction transaction) { + public void apply(final DOMStoreWriteTransaction transaction) { transaction.delete(getPath()); } @Override - public void apply(DataTreeModification transaction) { + public void apply(final DataTreeModification transaction) { transaction.delete(getPath()); } @@ -50,18 +57,22 @@ public class DeleteModification extends AbstractModification { } @Override - public void readExternal(ObjectInput in) { + public void readExternal(final ObjectInput in) { setPath(SerializationUtils.deserializePath(in)); } @Override - public void writeExternal(ObjectOutput out) { + public void writeExternal(final ObjectOutput out) { SerializationUtils.serializePath(getPath(), out); } - public static DeleteModification fromStream(ObjectInput in, short version) { - DeleteModification mod = new DeleteModification(version); - mod.readExternal(in); - return mod; + @Override + public void writeTo(final NormalizedNodeDataOutput out) throws IOException { + out.writeYangInstanceIdentifier(getPath()); + } + + public static DeleteModification fromStream(final NormalizedNodeDataInput in, final short version) + throws IOException { + return new DeleteModification(version, in.readYangInstanceIdentifier()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MergeModification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MergeModification.java index 465372736b..558641948a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MergeModification.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MergeModification.java @@ -8,8 +8,9 @@ package org.opendaylight.controller.cluster.datastore.modification; -import java.io.ObjectInput; +import java.io.IOException; import org.opendaylight.controller.cluster.datastore.DataStoreVersions; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput; import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -25,7 +26,7 @@ public class MergeModification extends WriteModification { this(DataStoreVersions.CURRENT_VERSION); } - public MergeModification(short version) { + public MergeModification(final short version) { super(version); } @@ -33,6 +34,10 @@ public class MergeModification extends WriteModification { super(path, data); } + MergeModification(final short version, final YangInstanceIdentifier path, final NormalizedNode data) { + super(version, path, data); + } + @Override public void apply(final DOMStoreWriteTransaction transaction) { transaction.merge(getPath(), getData()); @@ -48,9 +53,10 @@ public class MergeModification extends WriteModification { return MERGE; } - public static MergeModification fromStream(ObjectInput in, short version) { - MergeModification mod = new MergeModification(version); - mod.readExternal(in); - return mod; + public static MergeModification fromStream(final NormalizedNodeDataInput in, final short version) + throws IOException { + final NormalizedNode node = in.readNormalizedNode(); + final YangInstanceIdentifier path = in.readYangInstanceIdentifier(); + return new MergeModification(version, path, node); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/Modification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/Modification.java index b11357e61a..eddfba2ce1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/Modification.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/Modification.java @@ -12,6 +12,7 @@ import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput; import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; @@ -59,4 +60,6 @@ public interface Modification extends Externalizable { @Override void readExternal(ObjectInput in) throws IOException, ClassNotFoundException; + + void writeTo(NormalizedNodeDataOutput out) throws IOException; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java index 485bb42a94..79e3b14961 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java @@ -17,8 +17,9 @@ import java.util.Collections; import java.util.List; import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.datastore.messages.VersionedExternalizableMessage; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput; import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputOutput; -import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils; import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; @@ -35,19 +36,19 @@ public class MutableCompositeModification extends VersionedExternalizableMessage this(DataStoreVersions.CURRENT_VERSION); } - public MutableCompositeModification(short version) { + public MutableCompositeModification(final short version) { super(version); } @Override - public void apply(DOMStoreWriteTransaction transaction) { + public void apply(final DOMStoreWriteTransaction transaction) { for (Modification modification : modifications) { modification.apply(transaction); } } @Override - public void apply(DataTreeModification transaction) { + public void apply(final DataTreeModification transaction) { for (Modification modification : modifications) { modification.apply(transaction); } @@ -63,12 +64,12 @@ public class MutableCompositeModification extends VersionedExternalizableMessage * * @param modification the modification to add. */ - public void addModification(Modification modification) { + public void addModification(final Modification modification) { Preconditions.checkNotNull(modification); modifications.add(modification); } - public void addModifications(Iterable newMods) { + public void addModifications(final Iterable newMods) { for (Modification mod : newMods) { addModification(mod); } @@ -84,61 +85,56 @@ public class MutableCompositeModification extends VersionedExternalizableMessage } @Override - public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { super.readExternal(in); int size = in.readInt(); - - if (size > 1) { - SerializationUtils.REUSABLE_READER_TL.set(NormalizedNodeInputOutput.newDataInputWithoutValidation(in)); - } - - try { + if (size > 0) { + final NormalizedNodeDataInput input = NormalizedNodeInputOutput.newDataInputWithoutValidation(in); for (int i = 0; i < size; i++) { byte type = in.readByte(); switch (type) { case Modification.WRITE: - modifications.add(WriteModification.fromStream(in, getVersion())); + modifications.add(WriteModification.fromStream(input, getVersion())); break; case Modification.MERGE: - modifications.add(MergeModification.fromStream(in, getVersion())); + modifications.add(MergeModification.fromStream(input, getVersion())); break; case Modification.DELETE: - modifications.add(DeleteModification.fromStream(in, getVersion())); + modifications.add(DeleteModification.fromStream(input, getVersion())); break; default: break; } } - } finally { - SerializationUtils.REUSABLE_READER_TL.remove(); } } @Override - public void writeExternal(ObjectOutput out) throws IOException { + public void writeExternal(final ObjectOutput out) throws IOException { super.writeExternal(out); - out.writeInt(modifications.size()); - - if (modifications.size() > 1) { - SerializationUtils.REUSABLE_WRITER_TL.set(NormalizedNodeInputOutput.newDataOutput(out)); - } - - try { - for (Modification mod: modifications) { - out.writeByte(mod.getType()); - mod.writeExternal(out); + final int size = modifications.size(); + out.writeInt(size); + if (size > 0) { + try (NormalizedNodeDataOutput stream = NormalizedNodeInputOutput.newDataOutput(out)) { + for (Modification mod : modifications) { + out.writeByte(mod.getType()); + mod.writeTo(stream); + } } - } finally { - SerializationUtils.REUSABLE_WRITER_TL.remove(); } } - public static MutableCompositeModification fromSerializable(Object serializable) { + public static MutableCompositeModification fromSerializable(final Object serializable) { Preconditions.checkArgument(serializable instanceof MutableCompositeModification); return (MutableCompositeModification)serializable; } + + @Override + public void writeTo(final NormalizedNodeDataOutput out) throws IOException { + throw new UnsupportedOperationException(); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java index 70125e29e5..63fa74069a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java @@ -8,9 +8,12 @@ package org.opendaylight.controller.cluster.datastore.modification; +import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import org.opendaylight.controller.cluster.datastore.DataStoreVersions; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataOutput; import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils; import org.opendaylight.controller.cluster.datastore.node.utils.stream.SerializationUtils.Applier; import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; @@ -30,10 +33,15 @@ public class WriteModification extends AbstractModification { this(DataStoreVersions.CURRENT_VERSION); } - public WriteModification(short version) { + public WriteModification(final short version) { super(version); } + WriteModification(final short version, final YangInstanceIdentifier path, final NormalizedNode data) { + super(version, path); + this.data = data; + } + public WriteModification(final YangInstanceIdentifier path, final NormalizedNode data) { super(path); this.data = data; @@ -59,19 +67,26 @@ public class WriteModification extends AbstractModification { } @Override - public void readExternal(ObjectInput in) { + public void readExternal(final ObjectInput in) { SerializationUtils.deserializePathAndNode(in, this, APPLIER); } @Override - public void writeExternal(ObjectOutput out) { + public void writeExternal(final ObjectOutput out) { SerializationUtils.serializePathAndNode(getPath(), data, out); } - public static WriteModification fromStream(ObjectInput in, short version) { - WriteModification mod = new WriteModification(version); - mod.readExternal(in); - return mod; + public static WriteModification fromStream(final NormalizedNodeDataInput in, final short version) + throws IOException { + final NormalizedNode node = in.readNormalizedNode(); + final YangInstanceIdentifier path = in.readYangInstanceIdentifier(); + return new WriteModification(version, path, node); + } + + public void writeTo(final NormalizedNodeDataOutput out) throws IOException { + // FIXME: this should be inverted, as the path helps receivers in establishment of context + out.writeNormalizedNode(data); + out.writeYangInstanceIdentifier(getPath()); } private static final Applier APPLIER = (instance, path, node) -> {