Bug 2268: Use streaming for Modification payload 95/14495/6
authortpantelis <tpanteli@brocade.com>
Wed, 21 Jan 2015 19:45:42 +0000 (14:45 -0500)
committertpantelis <tpanteli@brocade.com>
Sun, 1 Feb 2015 15:29:26 +0000 (10:29 -0500)
Changed the *Modification classes to Externalizable and to use the
NormalizedNode streaming classes.

Added a new Payload implementation, ModificationPayload, that serializes the
Modification to a byte[].

Added ThreadLocals to SerializationUtils to reuse stream Reader/Writer
instances for efficiency for callers that serialize/deserialize multiple
objects.

Change-Id: Ib85af035d31027ffe4e2ddd5bcd49c1df1c6a42a
Signed-off-by: tpantelis <tpanteli@brocade.com>
22 files changed:
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeOutputStreamWriter.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/AbstractModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/DeleteModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/ImmutableCompositeModification.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MergeModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/Modification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/ModificationPayload.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/SerializationUtils.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationByteStringPayloadTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationPayloadTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/DeleteModificationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/MergeModificationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/ModificationPayloadTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModificationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/WriteModificationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/programs/appendentries/Client.java

index 05858a8..2c8e259 100644 (file)
@@ -21,7 +21,9 @@ import java.util.Map;
 import java.util.Set;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,6 +54,8 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri
 
     private final Map<String, Integer> stringCodeMap = new HashMap<>();
 
+    private NormalizedNodeWriter normalizedNodeWriter;
+
     public NormalizedNodeOutputStreamWriter(OutputStream stream) throws IOException {
         Preconditions.checkNotNull(stream);
         output = new DataOutputStream(stream);
@@ -61,6 +65,18 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri
         this.output = Preconditions.checkNotNull(output);
     }
 
+    private NormalizedNodeWriter normalizedNodeWriter() {
+        if(normalizedNodeWriter == null) {
+            normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(this);
+        }
+
+        return normalizedNodeWriter;
+    }
+
+    public void writeNormalizedNode(NormalizedNode<?, ?> node) throws IOException {
+        normalizedNodeWriter().write(node);
+    }
+
     @Override
     public void leafNode(YangInstanceIdentifier.NodeIdentifier name, Object value) throws IOException, IllegalArgumentException {
         Preconditions.checkNotNull(name, "Node identifier should not be null");
index a3ef033..dea377a 100644 (file)
@@ -27,6 +27,7 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -63,6 +64,7 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeList
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
@@ -326,9 +328,9 @@ public class Shard extends RaftActor {
                 applyModificationToState(getSender(), transactionID, cohortEntry.getModification());
             } else {
                 Shard.this.persistData(getSender(), transactionID,
-                        new CompositeModificationByteStringPayload(cohortEntry.getModification().toSerializable()));
+                        new ModificationPayload(cohortEntry.getModification()));
             }
-        } catch (InterruptedException | ExecutionException e) {
+        } catch (InterruptedException | ExecutionException | IOException e) {
             LOG.error(e, "An exception occurred while preCommitting transaction {}",
                     cohortEntry.getTransactionID());
             shardMBean.incrementFailedTransactionsCount();
@@ -684,7 +686,13 @@ public class Shard extends RaftActor {
 
     @Override
     protected void appendRecoveredLogEntry(final Payload data) {
-        if (data instanceof CompositeModificationPayload) {
+        if(data instanceof ModificationPayload) {
+            try {
+                currentLogRecoveryBatch.add(((ModificationPayload) data).getModification());
+            } catch (ClassNotFoundException | IOException e) {
+                LOG.error(e, "Error extracting ModificationPayload");
+            }
+        } else if (data instanceof CompositeModificationPayload) {
             currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
         } else if (data instanceof CompositeModificationByteStringPayload) {
             currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
@@ -761,7 +769,14 @@ public class Shard extends RaftActor {
     @Override
     protected void applyState(final ActorRef clientActor, final String identifier, final Object data) {
 
-        if (data instanceof CompositeModificationPayload) {
+        if(data instanceof ModificationPayload) {
+            try {
+                applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification());
+            } catch (ClassNotFoundException | IOException e) {
+                LOG.error(e, "Error extracting ModificationPayload");
+            }
+        }
+        else if (data instanceof CompositeModificationPayload) {
             Object modification = ((CompositeModificationPayload) data).getModification();
 
             applyModificationToState(clientActor, identifier, modification);
@@ -769,7 +784,6 @@ public class Shard extends RaftActor {
             Object modification = ((CompositeModificationByteStringPayload) data).getModification();
 
             applyModificationToState(clientActor, identifier, modification);
-
         } else {
             LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
                     data, data.getClass().getClassLoader(),
@@ -788,8 +802,7 @@ public class Shard extends RaftActor {
         } else if(clientActor == null) {
             // There's no clientActor to which to send a commit reply so we must be applying
             // replicated state from the leader.
-            commitWithNewTransaction(MutableCompositeModification.fromSerializable(
-                    modification, schemaContext));
+            commitWithNewTransaction(MutableCompositeModification.fromSerializable(modification));
         } else {
             // This must be the OK to commit after replication consensus.
             finishCommit(clientActor, identifier);
index 94fb584..9457456 100644 (file)
@@ -121,7 +121,7 @@ class ShardRecoveryCoordinator {
         public void run() {
             for(int i = 0; i < logEntries.size(); i++) {
                 MutableCompositeModification.fromSerializable(
-                        logEntries.get(i), schemaContext).apply(resultingTx);
+                        logEntries.get(i)).apply(resultingTx);
                 // Null out to GC quicker.
                 logEntries.set(i, null);
             }
index 95c7ae1..a4a2f45 100644 (file)
@@ -23,7 +23,6 @@ import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
-import org.opendaylight.controller.cluster.datastore.modification.ImmutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
@@ -82,8 +81,7 @@ public class ShardWriteTransaction extends ShardTransaction {
 
         } else if (message instanceof GetCompositedModification) {
             // This is here for testing only
-            getSender().tell(new GetCompositeModificationReply(
-                    new ImmutableCompositeModification(modification)), getSelf());
+            getSender().tell(new GetCompositeModificationReply(modification), getSelf());
         } else {
             super.handleReceive(message);
         }
@@ -94,7 +92,7 @@ public class ShardWriteTransaction extends ShardTransaction {
         LOG.debug("writeData at path : {}", message.getPath());
 
         modification.addModification(
-                new WriteModification(message.getPath(), message.getData(), getSchemaContext()));
+                new WriteModification(message.getPath(), message.getData()));
         try {
             transaction.write(message.getPath(), message.getData());
             WriteDataReply writeDataReply = WriteDataReply.INSTANCE;
@@ -110,7 +108,7 @@ public class ShardWriteTransaction extends ShardTransaction {
         LOG.debug("mergeData at path : {}", message.getPath());
 
         modification.addModification(
-                new MergeModification(message.getPath(), message.getData(), getSchemaContext()));
+                new MergeModification(message.getPath(), message.getData()));
 
         try {
             transaction.merge(message.getPath(), message.getData());
index 4f4f0fb..f04d004 100644 (file)
@@ -11,22 +11,24 @@ package org.opendaylight.controller.cluster.datastore.modification;
 
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
-import java.io.Serializable;
-
 /**
  * Base class to be used for all simple modifications that can be applied to a DOMStoreTransaction
  */
-public abstract class AbstractModification implements Modification,
-    Serializable {
+public abstract class AbstractModification implements Modification {
 
-    private static final long serialVersionUID = 1638042650152084457L;
+    private YangInstanceIdentifier path;
 
-    protected final YangInstanceIdentifier path;
+    protected AbstractModification() {
+    }
 
     protected AbstractModification(YangInstanceIdentifier path) {
         this.path = path;
     }
 
+    protected void setPath(YangInstanceIdentifier path) {
+        this.path = path;
+    }
+
     public YangInstanceIdentifier getPath() {
         return path;
     }
index 056fe75..833f86f 100644 (file)
@@ -8,7 +8,12 @@
 
 package org.opendaylight.controller.cluster.datastore.modification;
 
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -19,23 +24,51 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 public class DeleteModification extends AbstractModification {
     private static final long serialVersionUID = 1L;
 
+    public DeleteModification() {
+    }
+
     public DeleteModification(YangInstanceIdentifier path) {
         super(path);
     }
 
     @Override
     public void apply(DOMStoreWriteTransaction transaction) {
-        transaction.delete(path);
+        transaction.delete(getPath());
     }
 
     @Override
+    public byte getType() {
+        return DELETE;
+    }
+
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        in.readShort();
+        setPath(SerializationUtils.deserializePath(in));
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeShort(DataStoreVersions.CURRENT_VERSION);
+        SerializationUtils.serializePath(getPath(), out);
+    }
+
+    @Override
+    @Deprecated
     public Object toSerializable() {
         return PersistentMessages.Modification.newBuilder().setType(this.getClass().toString())
-                .setPath(InstanceIdentifierUtils.toSerializable(this.path)).build();
+                .setPath(InstanceIdentifierUtils.toSerializable(getPath())).build();
     }
 
+    @Deprecated
     public static DeleteModification fromSerializable(Object serializable) {
         PersistentMessages.Modification o = (PersistentMessages.Modification) serializable;
         return new DeleteModification(InstanceIdentifierUtils.fromSerializable(o.getPath()));
     }
+
+    public static DeleteModification fromStream(ObjectInput in) throws ClassNotFoundException, IOException {
+        DeleteModification mod = new DeleteModification();
+        mod.readExternal(in);
+        return mod;
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/ImmutableCompositeModification.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/ImmutableCompositeModification.java
deleted file mode 100644 (file)
index 2d11500..0000000
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore.modification;
-
-import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-
-import java.util.List;
-
-public class ImmutableCompositeModification implements CompositeModification {
-
-    private final CompositeModification modification;
-
-    public ImmutableCompositeModification(CompositeModification modification) {
-        this.modification = modification;
-    }
-
-    @Override
-    public List<Modification> getModifications() {
-        return modification.getModifications();
-    }
-
-    @Override
-    public void apply(DOMStoreWriteTransaction transaction) {
-        modification.apply(transaction);
-    }
-
-    @Override public Object toSerializable() {
-
-        PersistentMessages.CompositeModification.Builder builder =
-            PersistentMessages.CompositeModification.newBuilder();
-
-        for (Modification m : modification.getModifications()) {
-            builder.addModification(
-                (PersistentMessages.Modification) m.toSerializable());
-        }
-
-        return builder.build();
-    }
-}
index 2f9d776..571443e 100644 (file)
@@ -8,13 +8,14 @@
 
 package org.opendaylight.controller.cluster.datastore.modification;
 
+import java.io.IOException;
+import java.io.ObjectInput;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Decoded;
 import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
  * MergeModification stores all the parameters required to merge data into the specified path
@@ -22,19 +23,33 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 public class MergeModification extends WriteModification {
     private static final long serialVersionUID = 1L;
 
-    public MergeModification(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
-        final SchemaContext schemaContext) {
-        super(path, data, schemaContext);
+    public MergeModification() {
+    }
+
+    public MergeModification(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        super(path, data);
     }
 
     @Override
     public void apply(final DOMStoreWriteTransaction transaction) {
-        transaction.merge(path, data);
+        transaction.merge(getPath(), getData());
     }
 
-    public static MergeModification fromSerializable(final Object serializable, final SchemaContext schemaContext) {
+    @Override
+    public byte getType() {
+        return MERGE;
+    }
+
+    @Deprecated
+    public static MergeModification fromSerializable(final Object serializable) {
         PersistentMessages.Modification o = (PersistentMessages.Modification) serializable;
-        Decoded decoded = new NormalizedNodeToNodeCodec(schemaContext).decode(o.getPath(), o.getData());
-        return new MergeModification(decoded.getDecodedPath(), decoded.getDecodedNode(), schemaContext);
+        Decoded decoded = new NormalizedNodeToNodeCodec(null).decode(o.getPath(), o.getData());
+        return new MergeModification(decoded.getDecodedPath(), decoded.getDecodedNode());
+    }
+
+    public static MergeModification fromStream(ObjectInput in) throws ClassNotFoundException, IOException {
+        MergeModification mod = new MergeModification();
+        mod.readExternal(in);
+        return mod;
     }
 }
index ed9b1fe..2dfcdf0 100644 (file)
@@ -8,7 +8,7 @@
 
 package org.opendaylight.controller.cluster.datastore.modification;
 
-import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
+import java.io.Externalizable;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 
 /**
@@ -25,10 +25,22 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
  * which can then be applied to a write transaction
  * </p>
  */
-public interface Modification extends SerializableMessage {
-  /**
-   * Apply the modification to the specified transaction
-   * @param transaction
-   */
-  void apply(DOMStoreWriteTransaction transaction);
+public interface Modification extends Externalizable {
+
+    byte COMPOSITE = 1;
+    byte WRITE = 2;
+    byte MERGE = 3;
+    byte DELETE = 4;
+
+    /**
+     * Apply the modification to the specified transaction
+     *
+     * @param transaction
+     */
+    void apply(DOMStoreWriteTransaction transaction);
+
+    byte getType();
+
+    @Deprecated
+    Object toSerializable();
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/ModificationPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/ModificationPayload.java
new file mode 100644 (file)
index 0000000..2e39157
--- /dev/null
@@ -0,0 +1,81 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.modification;
+
+import com.google.protobuf.GeneratedMessage.GeneratedExtension;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.util.Map;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages.AppendEntries.ReplicatedLogEntry;
+
+/**
+ * Payload implementation for MutableCompositeModification used for persistence and replication.
+ *
+ * @author Thomas Pantelis
+ */
+public class ModificationPayload extends Payload implements Externalizable {
+    private static final long serialVersionUID = 1L;
+
+    private transient byte[] serializedPayload;
+
+    public ModificationPayload() {
+    }
+
+    public ModificationPayload(Modification from) throws IOException {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        ObjectOutputStream out = new ObjectOutputStream(bos);
+        out.writeObject(from);
+        out.close();
+        serializedPayload = bos.toByteArray();
+    }
+
+    public Modification getModification() throws IOException, ClassNotFoundException {
+        ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(serializedPayload));
+        Modification to = (Modification) in.readObject();
+        in.close();
+        return to;
+    }
+
+    @Override
+    public int size() {
+        return serializedPayload.length;
+    }
+
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        int size = in.readInt();
+        serializedPayload = new byte[size];
+        in.readFully(serializedPayload);
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeInt(serializedPayload.length);
+        out.write(serializedPayload);
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    @Deprecated
+    public <T> Map<GeneratedExtension, T> encode() {
+        return null;
+    }
+
+    @Override
+    @Deprecated
+    public Payload decode(ReplicatedLogEntry.Payload payload) {
+        return null;
+    }
+}
index 04854d2..5d7947b 100644 (file)
@@ -8,24 +8,30 @@
 
 package org.opendaylight.controller.cluster.datastore.modification;
 
-import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputStreamReader;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeOutputStreamWriter;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 
 /**
  * MutableCompositeModification is just a mutable version of a
  * CompositeModification {@link org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification#addModification(Modification)}
  */
-public class MutableCompositeModification
-    implements CompositeModification {
+public class MutableCompositeModification implements CompositeModification {
+    private static final long serialVersionUID = 1L;
 
-    private static final long serialVersionUID = 1163377899140186790L;
+    private final List<Modification> modifications;
 
-    private final List<Modification> modifications = new ArrayList<>();
+    public MutableCompositeModification() {
+        modifications = new ArrayList<>();
+    }
 
     @Override
     public void apply(DOMStoreWriteTransaction transaction) {
@@ -34,6 +40,11 @@ public class MutableCompositeModification
         }
     }
 
+    @Override
+    public byte getType() {
+        return COMPOSITE;
+    }
+
     /**
      * Add a new Modification to the list of Modifications represented by this
      * composite
@@ -44,25 +55,88 @@ public class MutableCompositeModification
         modifications.add(modification);
     }
 
+    @Override
     public List<Modification> getModifications() {
-        return Collections.unmodifiableList(modifications);
+        return modifications;
+    }
+
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        in.readShort();
+
+        int size = in.readInt();
+
+        if(size > 1) {
+            SerializationUtils.REUSABLE_READER_TL.set(new NormalizedNodeInputStreamReader(in));
+        }
+
+        try {
+            for(int i = 0; i < size; i++) {
+                byte type = in.readByte();
+                switch(type) {
+                case Modification.WRITE:
+                    modifications.add(WriteModification.fromStream(in));
+                    break;
+
+                case Modification.MERGE:
+                    modifications.add(MergeModification.fromStream(in));
+                    break;
+
+                case Modification.DELETE:
+                    modifications.add(DeleteModification.fromStream(in));
+                    break;
+                }
+            }
+        } finally {
+            SerializationUtils.REUSABLE_READER_TL.remove();
+        }
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeShort(DataStoreVersions.CURRENT_VERSION);
+
+        out.writeInt(modifications.size());
+
+        if(modifications.size() > 1) {
+            SerializationUtils.REUSABLE_WRITER_TL.set(new NormalizedNodeOutputStreamWriter(out));
+        }
+
+        try {
+            for(Modification mod: modifications) {
+                out.writeByte(mod.getType());
+                mod.writeExternal(out);
+            }
+        } finally {
+            SerializationUtils.REUSABLE_WRITER_TL.remove();
+        }
     }
 
-    @Override public Object toSerializable() {
+    @Override
+    @Deprecated
+    public Object toSerializable() {
         PersistentMessages.CompositeModification.Builder builder =
-            PersistentMessages.CompositeModification.newBuilder();
+                PersistentMessages.CompositeModification.newBuilder();
 
         builder.setTimeStamp(System.nanoTime());
 
         for (Modification m : modifications) {
             builder.addModification(
-                (PersistentMessages.Modification) m.toSerializable());
+                    (PersistentMessages.Modification) m.toSerializable());
         }
 
         return builder.build();
     }
 
-    public static MutableCompositeModification fromSerializable(Object serializable, SchemaContext schemaContext){
+    public static MutableCompositeModification fromSerializable(Object serializable) {
+        if(serializable instanceof MutableCompositeModification) {
+            return (MutableCompositeModification)serializable;
+        } else {
+            return fromLegacySerializable(serializable);
+        }
+    }
+
+    private static MutableCompositeModification fromLegacySerializable(Object serializable) {
         PersistentMessages.CompositeModification o = (PersistentMessages.CompositeModification) serializable;
         MutableCompositeModification compositeModification = new MutableCompositeModification();
 
@@ -70,9 +144,9 @@ public class MutableCompositeModification
             if(m.getType().equals(DeleteModification.class.toString())){
                 compositeModification.addModification(DeleteModification.fromSerializable(m));
             } else if(m.getType().equals(WriteModification.class.toString())){
-                compositeModification.addModification(WriteModification.fromSerializable(m, schemaContext));
+                compositeModification.addModification(WriteModification.fromSerializable(m));
             } else if(m.getType().equals(MergeModification.class.toString())){
-                compositeModification.addModification(MergeModification.fromSerializable(m, schemaContext));
+                compositeModification.addModification(MergeModification.fromSerializable(m));
             }
         }
 
index b296408..9c122c9 100644 (file)
@@ -8,32 +8,39 @@
 
 package org.opendaylight.controller.cluster.datastore.modification;
 
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Decoded;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils.Applier;
 import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
  * WriteModification stores all the parameters required to write data to the specified path
  */
 public class WriteModification extends AbstractModification {
     private static final long serialVersionUID = 1L;
-    protected final NormalizedNode<?, ?> data;
-    private final SchemaContext schemaContext;
 
-    public WriteModification(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data, final SchemaContext schemaContext) {
+    private NormalizedNode<?, ?> data;
+
+    public WriteModification() {
+    }
+
+    public WriteModification(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
         super(path);
         this.data = data;
-        this.schemaContext = schemaContext;
     }
 
     @Override
     public void apply(final DOMStoreWriteTransaction transaction) {
-        transaction.write(path, data);
+        transaction.write(getPath(), data);
     }
 
     public NormalizedNode<?, ?> getData() {
@@ -41,19 +48,51 @@ public class WriteModification extends AbstractModification {
     }
 
     @Override
-    public Object toSerializable() {
-        Encoded encoded = new NormalizedNodeToNodeCodec(schemaContext).encode(path, data);
+    public byte getType() {
+        return WRITE;
+    }
 
-        return PersistentMessages.Modification.newBuilder()
-                .setType(this.getClass().toString())
-                .setPath(encoded.getEncodedPath())
-                .setData(encoded.getEncodedNode().getNormalizedNode())
-                .build();
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        in.readShort(); // version
+
+        SerializationUtils.deserializePathAndNode(in, this, APPLIER);
     }
 
-    public static WriteModification fromSerializable(final Object serializable, final SchemaContext schemaContext) {
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeShort(DataStoreVersions.CURRENT_VERSION);
+        SerializationUtils.serializePathAndNode(getPath(), data, out);
+    }
+
+    @Override
+    @Deprecated
+    public Object toSerializable() {
+        Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(getPath(), getData());
+        return PersistentMessages.Modification.newBuilder().setType(this.getClass().toString())
+                .setPath(encoded.getEncodedPath()).setData(encoded.getEncodedNode()
+                        .getNormalizedNode()).build();
+    }
+
+    @Deprecated
+    public static WriteModification fromSerializable(final Object serializable) {
         PersistentMessages.Modification o = (PersistentMessages.Modification) serializable;
-        Decoded decoded = new NormalizedNodeToNodeCodec(schemaContext).decode(o.getPath(), o.getData());
-        return new WriteModification(decoded.getDecodedPath(), decoded.getDecodedNode(), schemaContext);
+        Decoded decoded = new NormalizedNodeToNodeCodec(null).decode(o.getPath(), o.getData());
+        return new WriteModification(decoded.getDecodedPath(), decoded.getDecodedNode());
+    }
+
+    public static WriteModification fromStream(ObjectInput in) throws ClassNotFoundException, IOException {
+        WriteModification mod = new WriteModification();
+        mod.readExternal(in);
+        return mod;
     }
+
+    private static final Applier<WriteModification> APPLIER = new Applier<WriteModification>() {
+        @Override
+        public void apply(WriteModification instance, YangInstanceIdentifier path,
+                NormalizedNode<?, ?> node) {
+            instance.setPath(path);
+            instance.data = node;
+        }
+    };
 }
index 87c78bd..189bbea 100644 (file)
@@ -15,7 +15,6 @@ import org.opendaylight.controller.cluster.datastore.node.utils.stream.Normalize
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeOutputStreamWriter;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
 
 /**
  * Provides various utility methods for serialization and de-serialization.
@@ -23,18 +22,38 @@ import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWrit
  * @author Thomas Pantelis
  */
 public final class SerializationUtils {
+    public static ThreadLocal<NormalizedNodeOutputStreamWriter> REUSABLE_WRITER_TL = new ThreadLocal<>();
+    public static ThreadLocal<NormalizedNodeInputStreamReader> REUSABLE_READER_TL = new ThreadLocal<>();
 
     public static interface Applier<T> {
         void apply(T instance, YangInstanceIdentifier path, NormalizedNode<?, ?> node);
     }
 
+    private static NormalizedNodeOutputStreamWriter streamWriter(DataOutput out) throws IOException {
+        NormalizedNodeOutputStreamWriter streamWriter = REUSABLE_WRITER_TL.get();
+        if(streamWriter == null) {
+            streamWriter = new NormalizedNodeOutputStreamWriter(out);
+        }
+
+        return streamWriter;
+    }
+
+    private static NormalizedNodeInputStreamReader streamReader(DataInput in) throws IOException {
+        NormalizedNodeInputStreamReader streamWriter = REUSABLE_READER_TL.get();
+        if(streamWriter == null) {
+            streamWriter = new NormalizedNodeInputStreamReader(in);
+        }
+
+        return streamWriter;
+    }
+
     public static void serializePathAndNode(YangInstanceIdentifier path, NormalizedNode<?, ?> node,
             DataOutput out) {
         Preconditions.checkNotNull(path);
         Preconditions.checkNotNull(node);
         try {
-            NormalizedNodeOutputStreamWriter streamWriter = new NormalizedNodeOutputStreamWriter(out);
-            NormalizedNodeWriter.forStreamWriter(streamWriter).write(node);
+            NormalizedNodeOutputStreamWriter streamWriter = streamWriter(out);
+            streamWriter.writeNormalizedNode(node);
             streamWriter.writeYangInstanceIdentifier(path);
         } catch (IOException e) {
             throw new IllegalArgumentException(String.format("Error serializing path %s and Node %s",
@@ -44,7 +63,7 @@ public final class SerializationUtils {
 
     public static <T> void deserializePathAndNode(DataInput in, T instance, Applier<T> applier) {
         try {
-            NormalizedNodeInputStreamReader streamReader = new NormalizedNodeInputStreamReader(in);
+            NormalizedNodeInputStreamReader streamReader = streamReader(in);
             NormalizedNode<?, ?> node = streamReader.readNormalizedNode();
             YangInstanceIdentifier path = streamReader.readYangInstanceIdentifier();
             applier.apply(instance, path, node);
@@ -57,8 +76,8 @@ public final class SerializationUtils {
         try {
             out.writeBoolean(node != null);
             if(node != null) {
-                NormalizedNodeOutputStreamWriter streamWriter = new NormalizedNodeOutputStreamWriter(out);
-                NormalizedNodeWriter.forStreamWriter(streamWriter).write(node);
+                NormalizedNodeOutputStreamWriter streamWriter = streamWriter(out);
+                streamWriter.writeNormalizedNode(node);
             }
         } catch (IOException e) {
             throw new IllegalArgumentException(String.format("Error serializing NormalizedNode %s",
@@ -70,7 +89,7 @@ public final class SerializationUtils {
             try {
                 boolean present = in.readBoolean();
                 if(present) {
-                    NormalizedNodeInputStreamReader streamReader = new NormalizedNodeInputStreamReader(in);
+                    NormalizedNodeInputStreamReader streamReader = streamReader(in);
                     return streamReader.readNormalizedNode();
                 }
             } catch (IOException e) {
@@ -83,16 +102,16 @@ public final class SerializationUtils {
     public static void serializePath(YangInstanceIdentifier path, DataOutput out) {
         Preconditions.checkNotNull(path);
         try {
-            NormalizedNodeOutputStreamWriter streamWriter = new NormalizedNodeOutputStreamWriter(out);
+            NormalizedNodeOutputStreamWriter streamWriter = streamWriter(out);
             streamWriter.writeYangInstanceIdentifier(path);
         } catch (IOException e) {
-            throw new IllegalArgumentException(String.format("Error serializing path {}", path), e);
+            throw new IllegalArgumentException(String.format("Error serializing path %s", path), e);
         }
     }
 
     public static YangInstanceIdentifier deserializePath(DataInput in) {
         try {
-            NormalizedNodeInputStreamReader streamReader = new NormalizedNodeInputStreamReader(in);
+            NormalizedNodeInputStreamReader streamReader = streamReader(in);
             return streamReader.readYangInstanceIdentifier();
         } catch (IOException e) {
             throw new IllegalArgumentException("Error deserializing path", e);
index db9f3d1..5a5387f 100644 (file)
@@ -14,7 +14,6 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.commons.lang.SerializationUtils;
 import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
@@ -33,8 +32,7 @@ public class CompositeModificationByteStringPayloadTest {
     public void testSerialization(){
         WriteModification writeModification =
                 new WriteModification(TestModel.TEST_PATH, ImmutableNodes
-                        .containerNode(TestModel.TEST_QNAME),
-                        TestModel.createTestContext());
+                        .containerNode(TestModel.TEST_QNAME));
 
         MutableCompositeModification compositeModification =
                 new MutableCompositeModification();
@@ -56,28 +54,20 @@ public class CompositeModificationByteStringPayloadTest {
     public void testAppendEntries(){
         List<ReplicatedLogEntry> entries = new ArrayList<>();
 
-        CompositeModificationByteStringPayload payload = newByteStringPayload(
-                new WriteModification(TestModel.OUTER_LIST_PATH,
-                        ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
-                        SCHEMA_CONTEXT));
+        WriteModification writeModification = new WriteModification(TestModel.OUTER_LIST_PATH,
+                ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
 
-        payload.clearModificationReference();
-
-        entries.add(new ReplicatedLogImplEntry(0, 1, payload));
+        MutableCompositeModification compositeModification = new MutableCompositeModification();
 
+        compositeModification.addModification(writeModification);
 
-        assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10).toSerializable());
-    }
-
+        CompositeModificationByteStringPayload payload =
+                new CompositeModificationByteStringPayload(compositeModification.toSerializable());
 
+        payload.clearModificationReference();
 
-    private CompositeModificationByteStringPayload newByteStringPayload(final Modification... mods) {
-        MutableCompositeModification compMod = new MutableCompositeModification();
-        for(Modification mod: mods) {
-            compMod.addModification(mod);
-        }
+        entries.add(new ReplicatedLogImplEntry(0, 1, payload));
 
-        return new CompositeModificationByteStringPayload(compMod.toSerializable());
+        assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10).toSerializable());
     }
-
 }
index cd74167..6fca38a 100644 (file)
@@ -1,12 +1,10 @@
 package org.opendaylight.controller.cluster.datastore;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
@@ -22,17 +20,6 @@ import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 
 public class CompositeModificationPayloadTest {
 
-
-    private static final String SERIALIZE_OUT = "serialize.out";
-
-    @After
-    public void shutDown(){
-        File f = new File(SERIALIZE_OUT);
-        if(f.exists()){
-            f.delete();
-        }
-    }
-
     @Test
     public void testBasic() throws IOException {
 
@@ -42,8 +29,7 @@ public class CompositeModificationPayloadTest {
             @Override public Payload getData() {
                 WriteModification writeModification =
                     new WriteModification(TestModel.TEST_PATH, ImmutableNodes
-                        .containerNode(TestModel.TEST_QNAME),
-                        TestModel.createTestContext());
+                        .containerNode(TestModel.TEST_QNAME));
 
                 MutableCompositeModification compositeModification =
                     new MutableCompositeModification();
@@ -73,11 +59,12 @@ public class CompositeModificationPayloadTest {
         AppendEntriesMessages.AppendEntries o = (AppendEntriesMessages.AppendEntries)
                 appendEntries.toSerializable(RaftVersions.HELIUM_VERSION);
 
-        o.writeDelimitedTo(new FileOutputStream(SERIALIZE_OUT));
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        o.writeDelimitedTo(bos);
 
         AppendEntriesMessages.AppendEntries appendEntries2 =
             AppendEntriesMessages.AppendEntries
-                .parseDelimitedFrom(new FileInputStream(SERIALIZE_OUT));
+                .parseDelimitedFrom(new ByteArrayInputStream(bos.toByteArray()));
 
         AppendEntries appendEntries1 = AppendEntries.fromSerializable(appendEntries2);
 
@@ -85,7 +72,5 @@ public class CompositeModificationPayloadTest {
 
 
         Assert.assertTrue(((CompositeModificationPayload) data).getModification().toString().contains(TestModel.TEST_QNAME.getNamespace().toString()));
-
     }
-
 }
index 42f3043..14fc3a1 100644 (file)
@@ -60,6 +60,7 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeList
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
@@ -78,7 +79,6 @@ import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
@@ -394,11 +394,26 @@ public class ShardTest extends AbstractActorTest {
 
         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        MutableCompositeModification compMod = new MutableCompositeModification();
-        compMod.addModification(new WriteModification(TestModel.TEST_PATH, node, SCHEMA_CONTEXT));
-        Payload payload = new CompositeModificationPayload(compMod.toSerializable());
-        ApplyState applyState = new ApplyState(null, "test",
-                new ReplicatedLogImplEntry(1, 2, payload));
+        ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
+                newModificationPayload(new WriteModification(TestModel.TEST_PATH, node))));
+
+        shard.underlyingActor().onReceiveCommand(applyState);
+
+        NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
+        assertEquals("Applied state", node, actual);
+
+        shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+    }
+
+    @Test
+    public void testApplyStateLegacy() throws Exception {
+
+        TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyStateLegacy");
+
+        NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
+                newLegacyByteStringPayload(new WriteModification(TestModel.TEST_PATH, node))));
 
         shard.underlyingActor().onReceiveCommand(applyState);
 
@@ -434,36 +449,46 @@ public class ShardTest extends AbstractActorTest {
 
         // Set up the InMemoryJournal.
 
-        InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newPayload(
+        InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
                   new WriteModification(TestModel.OUTER_LIST_PATH,
-                          ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
-                          SCHEMA_CONTEXT))));
+                          ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
 
         int nListEntries = 16;
         Set<Integer> listEntryKeys = new HashSet<>();
-        for(int i = 1; i <= nListEntries-5; i++) {
+        int i = 1;
+
+        // Add some of the legacy CompositeModificationPayload
+        for(; i <= 2; i++) {
             listEntryKeys.add(Integer.valueOf(i));
             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
             Modification mod = new MergeModification(path,
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
-                    SCHEMA_CONTEXT);
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
-                    newPayload(mod)));
+                    newLegacyPayload(mod)));
         }
 
-        // Add some of the new CompositeModificationByteStringPayload
-        for(int i = 11; i <= nListEntries; i++) {
+        // Add some of the legacy CompositeModificationByteStringPayload
+        for(; i <= 5; i++) {
             listEntryKeys.add(Integer.valueOf(i));
             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
             Modification mod = new MergeModification(path,
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
-                    SCHEMA_CONTEXT);
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
-                    newByteStringPayload(mod)));
+                    newLegacyByteStringPayload(mod)));
         }
 
+        // Add some of the ModificationPayload
+        for(; i <= nListEntries; i++) {
+            listEntryKeys.add(Integer.valueOf(i));
+            YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                    .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
+            Modification mod = new MergeModification(path,
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
+            InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
+                    newModificationPayload(mod)));
+        }
 
         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
                 new ApplyLogEntries(nListEntries));
@@ -527,7 +552,7 @@ public class ShardTest extends AbstractActorTest {
         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
     }
 
-    private CompositeModificationPayload newPayload(final Modification... mods) {
+    private CompositeModificationPayload newLegacyPayload(final Modification... mods) {
         MutableCompositeModification compMod = new MutableCompositeModification();
         for(Modification mod: mods) {
             compMod.addModification(mod);
@@ -536,7 +561,7 @@ public class ShardTest extends AbstractActorTest {
         return new CompositeModificationPayload(compMod.toSerializable());
     }
 
-    private CompositeModificationByteStringPayload newByteStringPayload(final Modification... mods) {
+    private CompositeModificationByteStringPayload newLegacyByteStringPayload(final Modification... mods) {
         MutableCompositeModification compMod = new MutableCompositeModification();
         for(Modification mod: mods) {
             compMod.addModification(mod);
@@ -545,6 +570,14 @@ public class ShardTest extends AbstractActorTest {
         return new CompositeModificationByteStringPayload(compMod.toSerializable());
     }
 
+    private ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
+        MutableCompositeModification compMod = new MutableCompositeModification();
+        for(Modification mod: mods) {
+            compMod.addModification(mod);
+        }
+
+        return new ModificationPayload(compMod);
+    }
 
     private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
             final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
@@ -594,7 +627,7 @@ public class ShardTest extends AbstractActorTest {
             }
         }).when(cohort).abort();
 
-        modification.addModification(new WriteModification(path, data, SCHEMA_CONTEXT));
+        modification.addModification(new WriteModification(path, data));
 
         return cohort;
     }
index b33f902..9daaa0d 100644 (file)
@@ -1,35 +1,49 @@
 package org.opendaylight.controller.cluster.datastore.modification;
 
+import static org.junit.Assert.assertEquals;
 import com.google.common.base.Optional;
+import org.apache.commons.lang.SerializationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 
-public class DeleteModificationTest extends AbstractModificationTest{
-
-  @Test
-  public void testApply() throws Exception {
-    //Write something into the datastore
-    DOMStoreReadWriteTransaction writeTransaction = store.newReadWriteTransaction();
-    WriteModification writeModification = new WriteModification(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext());
-    writeModification.apply(writeTransaction);
-    commitTransaction(writeTransaction);
-
-    //Check if it's in the datastore
-    Optional<NormalizedNode<?,?>> data = readData(TestModel.TEST_PATH);
-    Assert.assertTrue(data.isPresent());
-
-    //Delete stuff from the datastore
-    DOMStoreWriteTransaction deleteTransaction = store.newWriteOnlyTransaction();
-    DeleteModification deleteModification = new DeleteModification(TestModel.TEST_PATH);
-    deleteModification.apply(deleteTransaction);
-    commitTransaction(deleteTransaction);
-
-    data = readData(TestModel.TEST_PATH);
-    Assert.assertFalse(data.isPresent());
-  }
+public class DeleteModificationTest extends AbstractModificationTest {
+
+    @Test
+    public void testApply() throws Exception {
+        // Write something into the datastore
+        DOMStoreReadWriteTransaction writeTransaction = store.newReadWriteTransaction();
+        WriteModification writeModification = new WriteModification(TestModel.TEST_PATH,
+                ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+        writeModification.apply(writeTransaction);
+        commitTransaction(writeTransaction);
+
+        // Check if it's in the datastore
+        Optional<NormalizedNode<?, ?>> data = readData(TestModel.TEST_PATH);
+        Assert.assertTrue(data.isPresent());
+
+        // Delete stuff from the datastore
+        DOMStoreWriteTransaction deleteTransaction = store.newWriteOnlyTransaction();
+        DeleteModification deleteModification = new DeleteModification(TestModel.TEST_PATH);
+        deleteModification.apply(deleteTransaction);
+        commitTransaction(deleteTransaction);
+
+        data = readData(TestModel.TEST_PATH);
+        Assert.assertFalse(data.isPresent());
+    }
+
+    @Test
+    public void testSerialization() {
+        YangInstanceIdentifier path = TestModel.TEST_PATH;
+
+        DeleteModification expected = new DeleteModification(path);
+
+        DeleteModification clone = (DeleteModification) SerializationUtils.clone(expected);
+        assertEquals("getPath", expected.getPath(), clone.getPath());
+    }
 }
index 5d20211..a69d938 100644 (file)
@@ -1,13 +1,16 @@
 package org.opendaylight.controller.cluster.datastore.modification;
 
+import static org.junit.Assert.assertEquals;
 import com.google.common.base.Optional;
+import org.apache.commons.lang.SerializationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 
 public class MergeModificationTest extends AbstractModificationTest{
 
@@ -17,7 +20,8 @@ public class MergeModificationTest extends AbstractModificationTest{
 
         //Write something into the datastore
         DOMStoreReadWriteTransaction writeTransaction = store.newReadWriteTransaction();
-        MergeModification writeModification = new MergeModification(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext());
+        MergeModification writeModification = new MergeModification(TestModel.TEST_PATH,
+                ImmutableNodes.containerNode(TestModel.TEST_QNAME));
         writeModification.apply(writeTransaction);
         commitTransaction(writeTransaction);
 
@@ -29,16 +33,15 @@ public class MergeModificationTest extends AbstractModificationTest{
 
     @Test
     public void testSerialization() {
-        SchemaContext schemaContext = TestModel.createTestContext();
-        NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-        MergeModification mergeModification = new MergeModification(TestModel.TEST_PATH,
-                node, schemaContext);
+        YangInstanceIdentifier path = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> data = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
-        Object serialized = mergeModification.toSerializable();
+        MergeModification expected = new MergeModification(path, data);
 
-        MergeModification newModification = MergeModification.fromSerializable(serialized, schemaContext);
-
-        Assert.assertEquals("getPath", TestModel.TEST_PATH, newModification.getPath());
-        Assert.assertEquals("getData", node, newModification.getData());
+        MergeModification clone = (MergeModification) SerializationUtils.clone(expected);
+        assertEquals("getPath", expected.getPath(), clone.getPath());
+        assertEquals("getData", expected.getData(), clone.getData());
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/ModificationPayloadTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/ModificationPayloadTest.java
new file mode 100644 (file)
index 0000000..bbfff70
--- /dev/null
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.modification;
+
+import static org.junit.Assert.assertEquals;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+
+/**
+ * Unit tests for ModificationPayload.
+ *
+ * @author Thomas Pantelis
+ */
+public class ModificationPayloadTest {
+
+    @Test
+    public void test() throws Exception {
+
+        YangInstanceIdentifier writePath = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+        MutableCompositeModification compositeModification = new MutableCompositeModification();
+        compositeModification.addModification(new WriteModification(writePath, writeData));
+
+        ModificationPayload payload = new ModificationPayload(compositeModification);
+
+        MutableCompositeModification deserialized = (MutableCompositeModification) payload.getModification();
+
+        assertEquals("getModifications size", 1, deserialized.getModifications().size());
+        WriteModification write = (WriteModification)deserialized.getModifications().get(0);
+        assertEquals("getPath", writePath, write.getPath());
+        assertEquals("getData", writeData, write.getData());
+
+        ModificationPayload cloned =
+                (ModificationPayload) SerializationUtils.clone(payload);
+
+        deserialized = (MutableCompositeModification) payload.getModification();
+
+        assertEquals("getModifications size", 1, deserialized.getModifications().size());
+        write = (WriteModification)deserialized.getModifications().get(0);
+        assertEquals("getPath", writePath, write.getPath());
+        assertEquals("getData", writeData, write.getData());
+    }
+}
index f8116aa..8ae2a86 100644 (file)
@@ -1,15 +1,18 @@
 package org.opendaylight.controller.cluster.datastore.modification;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 
 public class MutableCompositeModificationTest extends AbstractModificationTest {
 
@@ -18,7 +21,7 @@ public class MutableCompositeModificationTest extends AbstractModificationTest {
 
         MutableCompositeModification compositeModification = new MutableCompositeModification();
         compositeModification.addModification(new WriteModification(TestModel.TEST_PATH,
-            ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()));
+            ImmutableNodes.containerNode(TestModel.TEST_QNAME)));
 
         DOMStoreReadWriteTransaction transaction = store.newReadWriteTransaction();
         compositeModification.apply(transaction);
@@ -31,13 +34,68 @@ public class MutableCompositeModificationTest extends AbstractModificationTest {
     }
 
     @Test
-    public void testEverySerializedCompositeModificationObjectMustBeDifferent(){
+    public void testSerialization() {
+        YangInstanceIdentifier writePath = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+        YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
+        NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
+
+        YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
+
         MutableCompositeModification compositeModification = new MutableCompositeModification();
-        compositeModification.addModification(new WriteModification(TestModel.TEST_PATH,
-            ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()));
-        Object one = compositeModification.toSerializable();
-        try{Thread.sleep(10);}catch(Exception err){}
-        Object two = compositeModification.toSerializable();
-        assertNotEquals(one,two);
+        compositeModification.addModification(new WriteModification(writePath, writeData));
+        compositeModification.addModification(new MergeModification(mergePath, mergeData));
+        compositeModification.addModification(new DeleteModification(deletePath));
+
+        MutableCompositeModification clone = (MutableCompositeModification) SerializationUtils.clone(compositeModification);
+
+        assertEquals("getModifications size", 3, clone.getModifications().size());
+
+        WriteModification write = (WriteModification)clone.getModifications().get(0);
+        assertEquals("getPath", writePath, write.getPath());
+        assertEquals("getData", writeData, write.getData());
+
+        MergeModification merge = (MergeModification)clone.getModifications().get(1);
+        assertEquals("getPath", mergePath, merge.getPath());
+        assertEquals("getData", mergeData, merge.getData());
+
+        DeleteModification delete = (DeleteModification)clone.getModifications().get(2);
+        assertEquals("getPath", deletePath, delete.getPath());
+    }
+
+    @Test
+    @Ignore
+    public void testSerializationScale() throws Exception {
+        YangInstanceIdentifier writePath = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+        MutableCompositeModification compositeModification = new MutableCompositeModification();
+        for(int i = 0; i < 1000; i++) {
+            compositeModification.addModification(new WriteModification(writePath, writeData));
+        }
+
+        Stopwatch sw = new Stopwatch();
+        sw.start();
+        for(int i = 0; i < 1000; i++) {
+            new ModificationPayload(compositeModification);
+        }
+
+        sw.stop();
+        System.out.println("Elapsed: "+sw);
+
+        ModificationPayload p = new ModificationPayload(compositeModification);
+        sw.start();
+        for(int i = 0; i < 1000; i++) {
+            p.getModification();
+        }
+
+        sw.stop();
+        System.out.println("Elapsed: "+sw);
     }
 }
index 3a82fff..2e9ce22 100644 (file)
@@ -1,22 +1,25 @@
 package org.opendaylight.controller.cluster.datastore.modification;
 
+import static org.junit.Assert.assertEquals;
 import com.google.common.base.Optional;
+import org.apache.commons.lang.SerializationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 
-public class WriteModificationTest extends AbstractModificationTest{
+public class WriteModificationTest extends AbstractModificationTest {
 
     @Test
     public void testApply() throws Exception {
         //Write something into the datastore
         DOMStoreReadWriteTransaction writeTransaction = store.newReadWriteTransaction();
         WriteModification writeModification = new WriteModification(TestModel.TEST_PATH,
-                ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext());
+                ImmutableNodes.containerNode(TestModel.TEST_QNAME));
         writeModification.apply(writeTransaction);
         commitTransaction(writeTransaction);
 
@@ -27,16 +30,15 @@ public class WriteModificationTest extends AbstractModificationTest{
 
     @Test
     public void testSerialization() {
-        SchemaContext schemaContext = TestModel.createTestContext();
-        NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-        WriteModification writeModification = new WriteModification(TestModel.TEST_PATH,
-                node, schemaContext);
-
-        Object serialized = writeModification.toSerializable();
+        YangInstanceIdentifier path = TestModel.TEST_PATH;
+        NormalizedNode<?, ?> data = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
 
-        WriteModification newModification = WriteModification.fromSerializable(serialized, schemaContext);
+        WriteModification expected = new WriteModification(path, data);
 
-        Assert.assertEquals("getPath", TestModel.TEST_PATH, newModification.getPath());
-        Assert.assertEquals("getData", node, newModification.getData());
+        WriteModification clone = (WriteModification) SerializationUtils.clone(expected);
+        assertEquals("getPath", expected.getPath(), clone.getPath());
+        assertEquals("getData", expected.getData(), clone.getData());
     }
 }
index a3041e8..79c1bb4 100644 (file)
@@ -20,6 +20,7 @@ import org.opendaylight.controller.cluster.datastore.modification.WriteModificat
 import org.opendaylight.controller.cluster.example.messages.KeyValue;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
@@ -72,16 +73,14 @@ public class Client {
             @Override public Payload getData() {
                 WriteModification writeModification =
                     new WriteModification(TestModel.TEST_PATH, ImmutableNodes
-                        .containerNode(TestModel.TEST_QNAME),
-                        TestModel.createTestContext()
-                    );
+                        .containerNode(TestModel.TEST_QNAME));
 
                 MutableCompositeModification compositeModification =
                     new MutableCompositeModification();
 
                 compositeModification.addModification(writeModification);
 
-                return new CompositeModificationPayload(
+                return new CompositeModificationByteStringPayload(
                     compositeModification.toSerializable());
             }
 

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.