BUG-5280: eliminate ShardTransactionIdentifier
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardWriteTransaction.java
index e1b342a982c89b42955d95ac1007b4486813e810..4ae54d3e5f727856c93a6f896e7630f21e11c10d 100644 (file)
@@ -15,15 +15,8 @@ import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
-import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
-import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.MergeData;
-import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.WriteData;
-import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 
 /**
@@ -37,8 +30,8 @@ public class ShardWriteTransaction extends ShardTransaction {
     private final ReadWriteShardDataTreeTransaction transaction;
 
     public ShardWriteTransaction(ReadWriteShardDataTreeTransaction transaction, ActorRef shardActor,
-            ShardStats shardStats, String transactionID, short clientTxVersion) {
-        super(shardActor, shardStats, transactionID, clientTxVersion);
+            ShardStats shardStats) {
+        super(shardActor, shardStats, transaction.getId());
         this.transaction = transaction;
     }
 
@@ -48,22 +41,9 @@ public class ShardWriteTransaction extends ShardTransaction {
     }
 
     @Override
-    public void handleReceive(Object message) throws Exception {
-
+    public void handleReceive(Object message) {
         if (message instanceof BatchedModifications) {
             batchedModifications((BatchedModifications)message);
-        } else if (message instanceof ReadyTransaction) {
-            readyTransaction(!SERIALIZED_REPLY, false);
-        } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
-            readyTransaction(SERIALIZED_REPLY, false);
-        } else if(WriteData.isSerializedType(message)) {
-            writeData(WriteData.fromSerializable(message), SERIALIZED_REPLY);
-
-        } else if(MergeData.isSerializedType(message)) {
-            mergeData(MergeData.fromSerializable(message), SERIALIZED_REPLY);
-
-        } else if(DeleteData.isSerializedType(message)) {
-            deleteData(DeleteData.fromSerializable(message), SERIALIZED_REPLY);
         } else {
             super.handleReceive(message);
         }
@@ -94,7 +74,7 @@ public class ShardWriteTransaction extends ShardTransaction {
                             totalBatchedModificationsReceived, batched.getTotalMessagesSent()));
                 }
 
-                readyTransaction(false, batched.isDoCommitOnReady());
+                readyTransaction(false, batched.isDoCommitOnReady(), batched.getVersion());
             } else {
                 getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
             }
@@ -108,12 +88,12 @@ public class ShardWriteTransaction extends ShardTransaction {
         }
     }
 
-    protected final void dataExists(DataExists message, final boolean returnSerialized) {
-        super.dataExists(transaction, message, returnSerialized);
+    protected final void dataExists(DataExists message) {
+        super.dataExists(transaction, message);
     }
 
-    protected final void readData(ReadData message, final boolean returnSerialized) {
-        super.readData(transaction, message, returnSerialized);
+    protected final void readData(ReadData message) {
+        super.readData(transaction, message);
     }
 
     private boolean checkClosed() {
@@ -125,63 +105,13 @@ public class ShardWriteTransaction extends ShardTransaction {
         }
     }
 
-    private void writeData(WriteData message, boolean returnSerialized) {
-        LOG.debug("writeData at path : {}", message.getPath());
-        if (checkClosed()) {
-            return;
-        }
-
-        try {
-            transaction.getSnapshot().write(message.getPath(), message.getData());
-            WriteDataReply writeDataReply = WriteDataReply.INSTANCE;
-            getSender().tell(returnSerialized ? writeDataReply.toSerializable(message.getVersion()) :
-                writeDataReply, getSelf());
-        }catch(Exception e){
-            getSender().tell(new akka.actor.Status.Failure(e), getSelf());
-        }
-    }
-
-    private void mergeData(MergeData message, boolean returnSerialized) {
-        LOG.debug("mergeData at path : {}", message.getPath());
-        if (checkClosed()) {
-            return;
-        }
-
-        try {
-            transaction.getSnapshot().merge(message.getPath(), message.getData());
-            MergeDataReply mergeDataReply = MergeDataReply.INSTANCE;
-            getSender().tell(returnSerialized ? mergeDataReply.toSerializable(message.getVersion()) :
-                mergeDataReply, getSelf());
-        }catch(Exception e){
-            getSender().tell(new akka.actor.Status.Failure(e), getSelf());
-        }
-    }
-
-    private void deleteData(DeleteData message, boolean returnSerialized) {
-        LOG.debug("deleteData at path : {}", message.getPath());
-        if (checkClosed()) {
-            return;
-        }
-
-        try {
-            transaction.getSnapshot().delete(message.getPath());
-            DeleteDataReply deleteDataReply = DeleteDataReply.INSTANCE;
-            getSender().tell(returnSerialized ? deleteDataReply.toSerializable(message.getVersion()) :
-                deleteDataReply, getSelf());
-        } catch(Exception e) {
-            getSender().tell(new akka.actor.Status.Failure(e), getSelf());
-        }
-    }
-
-    private void readyTransaction(boolean returnSerialized, boolean doImmediateCommit) {
+    private void readyTransaction(boolean returnSerialized, boolean doImmediateCommit, short clientTxVersion) {
         String transactionID = getTransactionID();
 
         LOG.debug("readyTransaction : {}", transactionID);
 
-        ShardDataTreeCohort cohort =  transaction.ready();
-
-        getShardActor().forward(new ForwardedReadyTransaction(transactionID, getClientTxVersion(),
-                cohort, returnSerialized, doImmediateCommit), getContext());
+        getShardActor().forward(new ForwardedReadyTransaction(transactionID, clientTxVersion,
+                transaction, doImmediateCommit), getContext());
 
         // The shard will handle the commit from here so we're no longer needed - self-destruct.
         getSelf().tell(PoisonPill.getInstance(), getSelf());