BUG 2676 : Apply backpressure when creating transactions
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardWriteTransaction.java
index 2e43219523e0d34b5b015b0ef0fc39370dbf89b8..a4a2f45fdbdda87cc1166aa0e169214eea0df313 100644 (file)
@@ -23,7 +23,6 @@ import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
-import org.opendaylight.controller.cluster.datastore.modification.ImmutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
@@ -74,7 +73,7 @@ public class ShardWriteTransaction extends ShardTransaction {
         } else if(MergeData.isSerializedType(message)) {
             mergeData(transaction, MergeData.fromSerializable(message), SERIALIZED_REPLY);
 
-        } else if(DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+        } else if(DeleteData.isSerializedType(message)) {
             deleteData(transaction, DeleteData.fromSerializable(message), SERIALIZED_REPLY);
 
         } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
@@ -82,8 +81,7 @@ public class ShardWriteTransaction extends ShardTransaction {
 
         } else if (message instanceof GetCompositedModification) {
             // This is here for testing only
-            getSender().tell(new GetCompositeModificationReply(
-                    new ImmutableCompositeModification(modification)), getSelf());
+            getSender().tell(new GetCompositeModificationReply(modification), getSelf());
         } else {
             super.handleReceive(message);
         }
@@ -94,7 +92,7 @@ public class ShardWriteTransaction extends ShardTransaction {
         LOG.debug("writeData at path : {}", message.getPath());
 
         modification.addModification(
-                new WriteModification(message.getPath(), message.getData(), getSchemaContext()));
+                new WriteModification(message.getPath(), message.getData()));
         try {
             transaction.write(message.getPath(), message.getData());
             WriteDataReply writeDataReply = WriteDataReply.INSTANCE;
@@ -110,7 +108,7 @@ public class ShardWriteTransaction extends ShardTransaction {
         LOG.debug("mergeData at path : {}", message.getPath());
 
         modification.addModification(
-                new MergeModification(message.getPath(), message.getData(), getSchemaContext()));
+                new MergeModification(message.getPath(), message.getData()));
 
         try {
             transaction.merge(message.getPath(), message.getData());
@@ -129,9 +127,9 @@ public class ShardWriteTransaction extends ShardTransaction {
         modification.addModification(new DeleteModification(message.getPath()));
         try {
             transaction.delete(message.getPath());
-            DeleteDataReply deleteDataReply = new DeleteDataReply();
-            getSender().tell(returnSerialized ? deleteDataReply.toSerializable() : deleteDataReply,
-                getSelf());
+            DeleteDataReply deleteDataReply = DeleteDataReply.INSTANCE;
+            getSender().tell(returnSerialized ? deleteDataReply.toSerializable(message.getVersion()) :
+                deleteDataReply, getSelf());
         }catch(Exception e){
             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
         }