Bug 2265: Use streaming for DeleteData message 76/14476/3
authortpantelis <tpanteli@brocade.com>
Wed, 21 Jan 2015 18:31:12 +0000 (13:31 -0500)
committertpantelis <tpanteli@brocade.com>
Thu, 22 Jan 2015 00:23:38 +0000 (19:23 -0500)
The WriteData and MergeData messages were changed to use streaming. We
might as well do it for DeleteData as well as the YangInstanceIdentifier
streaming is faster.

Change-Id: Ie48662ecc5c8a83734f3155ca8067d25bb153058
Signed-off-by: tpantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DeleteData.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DeleteDataReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/SerializationUtils.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DeleteDataTest.java [new file with mode: 0644]

index 2e43219523e0d34b5b015b0ef0fc39370dbf89b8..95c7ae10c0c4f394a219cd4ae2afec78c62ed1c7 100644 (file)
@@ -74,7 +74,7 @@ public class ShardWriteTransaction extends ShardTransaction {
         } else if(MergeData.isSerializedType(message)) {
             mergeData(transaction, MergeData.fromSerializable(message), SERIALIZED_REPLY);
 
-        } else if(DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+        } else if(DeleteData.isSerializedType(message)) {
             deleteData(transaction, DeleteData.fromSerializable(message), SERIALIZED_REPLY);
 
         } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
@@ -129,9 +129,9 @@ public class ShardWriteTransaction extends ShardTransaction {
         modification.addModification(new DeleteModification(message.getPath()));
         try {
             transaction.delete(message.getPath());
-            DeleteDataReply deleteDataReply = new DeleteDataReply();
-            getSender().tell(returnSerialized ? deleteDataReply.toSerializable() : deleteDataReply,
-                getSelf());
+            DeleteDataReply deleteDataReply = DeleteDataReply.INSTANCE;
+            getSender().tell(returnSerialized ? deleteDataReply.toSerializable(message.getVersion()) :
+                deleteDataReply, getSelf());
         }catch(Exception e){
             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
         }
index 81bc1026e44e823dcb6485e32673fa52c27eeb84..04bc63c5a5e73506dc9835ae4abf36915372bcee 100644 (file)
@@ -8,15 +8,26 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
-public class DeleteData implements SerializableMessage {
+public class DeleteData implements VersionedSerializableMessage, Externalizable {
+    private static final long serialVersionUID = 1L;
 
-    public static final Class<ShardTransactionMessages.DeleteData> SERIALIZABLE_CLASS = ShardTransactionMessages.DeleteData.class;
+    public static final Class<DeleteData> SERIALIZABLE_CLASS = DeleteData.class;
 
-    private final YangInstanceIdentifier path;
+    private YangInstanceIdentifier path;
+    private short version;
+
+    public DeleteData() {
+    }
 
     public DeleteData(final YangInstanceIdentifier path) {
         this.path = path;
@@ -26,13 +37,46 @@ public class DeleteData implements SerializableMessage {
         return path;
     }
 
-    @Override public Object toSerializable() {
-        return ShardTransactionMessages.DeleteData.newBuilder()
-            .setInstanceIdentifierPathArguments(InstanceIdentifierUtils.toSerializable(path)).build();
+    public short getVersion() {
+        return version;
+    }
+
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        version = in.readShort(); // Read the version - don't need to do anything with it now
+        path = SerializationUtils.deserializePath(in);
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeShort(version);
+        SerializationUtils.serializePath(path, out);
+    }
+
+    @Override
+    public Object toSerializable(short toVersion) {
+        if(toVersion >= DataStoreVersions.LITHIUM_VERSION) {
+            version = toVersion;
+            return this;
+        } else {
+            // To base or R1 Helium version
+            return ShardTransactionMessages.DeleteData.newBuilder().setInstanceIdentifierPathArguments(
+                    InstanceIdentifierUtils.toSerializable(path)).build();
+        }
+    }
+
+    public static DeleteData fromSerializable(final Object serializable) {
+        if(serializable instanceof DeleteData) {
+            return (DeleteData) serializable;
+        } else {
+            // From base or R1 Helium version
+            ShardTransactionMessages.DeleteData o = (ShardTransactionMessages.DeleteData) serializable;
+            return new DeleteData(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments()));
+        }
     }
 
-    public static DeleteData fromSerializable(final Object serializable){
-        ShardTransactionMessages.DeleteData o = (ShardTransactionMessages.DeleteData) serializable;
-        return new DeleteData(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments()));
+    public static boolean isSerializedType(Object message) {
+        return SERIALIZABLE_CLASS.isInstance(message) ||
+               message instanceof ShardTransactionMessages.DeleteData;
     }
 }
index eba9c39170b4eef45509eb0de9f488172c5f8bfb..0c6ff0e68d69d05e56871d3e197795387310232d 100644 (file)
@@ -10,16 +10,14 @@ package org.opendaylight.controller.cluster.datastore.messages;
 
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
-public class DeleteDataReply implements SerializableMessage{
-    public static final Class<ShardTransactionMessages.DeleteDataReply> SERIALIZABLE_CLASS =
-            ShardTransactionMessages.DeleteDataReply.class;
+public class DeleteDataReply extends EmptyReply {
 
-    private static final Object SERIALIZED_INSTANCE = ShardTransactionMessages.DeleteDataReply.newBuilder().build();
+    private static final Object LEGACY_SERIALIZED_INSTANCE =
+            ShardTransactionMessages.DeleteDataReply.newBuilder().build();
 
     public static final DeleteDataReply INSTANCE = new DeleteDataReply();
 
-    @Override
-    public Object toSerializable() {
-        return SERIALIZED_INSTANCE;
+    public DeleteDataReply() {
+        super(LEGACY_SERIALIZED_INSTANCE);
     }
 }
index 8404a6e6d81f99788b5dcba2dca5aa1b4ba2c763..3f2125746300f5350ae42be63d10e6b761840a2e 100644 (file)
@@ -23,6 +23,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWrit
  * @author Thomas Pantelis
  */
 public final class SerializationUtils {
+
     public static interface Applier<T> {
         void apply(T instance, YangInstanceIdentifier path, NormalizedNode<?, ?> node);
     }
@@ -78,4 +79,23 @@ public final class SerializationUtils {
 
         return null;
     }
+
+    public static void serializePath(YangInstanceIdentifier path, DataOutput out) {
+        Preconditions.checkNotNull(path);
+        try {
+            NormalizedNodeOutputStreamWriter streamWriter = new NormalizedNodeOutputStreamWriter(out);
+            streamWriter.writeYangInstanceIdentifier(path);
+        } catch (IOException e) {
+            throw new IllegalArgumentException(String.format("Error serializing path {}", path), e);
+        }
+    }
+
+    public static YangInstanceIdentifier deserializePath(DataInput in) {
+        try {
+            NormalizedNodeInputStreamReader streamReader = new NormalizedNodeInputStreamReader(in);
+            return streamReader.readYangInstanceIdentifier();
+        } catch (IOException e) {
+            throw new IllegalArgumentException("Error deserializing path", e);
+        }
+    }
 }
index 79480ce5926dbce0be66580eb602ed8592c30bc0..efae106617c2a656ce560aa14de53e1d8c609c9f 100644 (file)
@@ -353,13 +353,14 @@ public class ShardTransactionTest extends AbstractActorTest {
                     DataStoreVersions.CURRENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testDeleteData");
 
-            transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
+            transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(
+                    DataStoreVersions.HELIUM_2_VERSION), getRef());
 
             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
 
             assertModification(transaction, DeleteModification.class);
 
-            //unserialized merge
+            //unserialized
             transaction.tell(new DeleteData(TestModel.TEST_PATH), getRef());
 
             expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
@@ -433,7 +434,8 @@ public class ShardTransactionTest extends AbstractActorTest {
                 DataStoreVersions.CURRENT_VERSION);
         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
 
-        transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
+        transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(
+                DataStoreVersions.CURRENT_VERSION), ActorRef.noSender());
     }
 
     @Test
index 79edd19bba3328034ea313baa28333ba398226af..6d80bbb5b158eb2d962a87c0bb6bf08654be2bf8 100644 (file)
@@ -370,8 +370,12 @@ public class TransactionProxyTest {
         return Futures.successful(new MergeDataReply());
     }
 
+    private Future<Object> deleteSerializedDataReply(short version) {
+        return Futures.successful(new DeleteDataReply().toSerializable(version));
+    }
+
     private Future<Object> deleteSerializedDataReply() {
-        return Futures.successful(new DeleteDataReply().toSerializable());
+        return deleteSerializedDataReply(DataStoreVersions.CURRENT_VERSION);
     }
 
     private Future<DeleteDataReply> deleteDataReply() {
@@ -826,7 +830,7 @@ public class TransactionProxyTest {
                 eq(actorSelection(actorRef)), eqSerializedDeleteData());
 
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                DeleteDataReply.SERIALIZABLE_CLASS);
+                DeleteDataReply.class);
     }
 
     private void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy,
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DeleteDataTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/DeleteDataTest.java
new file mode 100644 (file)
index 0000000..e950b78
--- /dev/null
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages.InstanceIdentifier;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * Unit tests for DeleteData.
+ *
+ * @author Thomas Pantelis
+ */
+public class DeleteDataTest {
+
+    @Test
+    public void testSerialization() {
+        YangInstanceIdentifier path = TestModel.TEST_PATH;
+
+        DeleteData expected = new DeleteData(path);
+
+        Object serialized = expected.toSerializable(DataStoreVersions.CURRENT_VERSION);
+        assertEquals("Serialized type", DeleteData.class, serialized.getClass());
+        assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((DeleteData)serialized).getVersion());
+
+        Object clone = SerializationUtils.clone((Serializable) serialized);
+        assertEquals("Version", DataStoreVersions.CURRENT_VERSION, ((DeleteData)clone).getVersion());
+        DeleteData actual = DeleteData.fromSerializable(clone);
+        assertEquals("getPath", expected.getPath(), actual.getPath());
+    }
+
+    @Test
+    public void testIsSerializedType() {
+        assertEquals("isSerializedType", true, DeleteData.isSerializedType(
+                ShardTransactionMessages.DeleteData.newBuilder()
+                    .setInstanceIdentifierPathArguments(InstanceIdentifier.getDefaultInstance()).build()));
+        assertEquals("isSerializedType", true,
+                DeleteData.isSerializedType(new DeleteData()));
+        assertEquals("isSerializedType", false, DeleteData.isSerializedType(new Object()));
+    }
+
+    /**
+     * Tests backwards compatible serialization/deserialization of a DeleteData message with the
+     * base and R1 Helium versions, which used the protobuff DeleteData message.
+     */
+    @Test
+    public void testSerializationWithHeliumR1Version() throws Exception {
+        YangInstanceIdentifier path = TestModel.TEST_PATH;
+
+        DeleteData expected = new DeleteData(path);
+
+        Object serialized = expected.toSerializable(DataStoreVersions.HELIUM_1_VERSION);
+        assertEquals("Serialized type", ShardTransactionMessages.DeleteData.class, serialized.getClass());
+
+        DeleteData actual = DeleteData.fromSerializable(SerializationUtils.clone((Serializable) serialized));
+        assertEquals("getPath", expected.getPath(), actual.getPath());
+    }
+}