Use BatchedModifications message in place of ReadyTransaction message
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionContextImpl.java
index b9900889b1125e8ac229f89af9b4f64551209b8b..f34c5a257125026f61f02fcbf18c945160ede2bc 100644 (file)
@@ -14,13 +14,11 @@ import com.google.common.base.Optional;
 import com.google.common.util.concurrent.SettableFuture;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
@@ -93,15 +91,11 @@ public class TransactionContextImpl extends AbstractTransactionContext {
     public Future<ActorSelection> readyTransaction() {
         LOG.debug("Tx {} readyTransaction called", getIdentifier());
 
-        // Send the remaining batched modifications if any.
+        // Send the remaining batched modifications, if any, with the ready flag set.
 
-        sendBatchedModifications();
-
-        // Send the ReadyTransaction message to the Tx actor.
-
-        Future<Object> readyReplyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
+        Future<Object> lastModificationsFuture = sendBatchedModifications(true);
 
-        return transformReadyReply(readyReplyFuture);
+        return transformReadyReply(lastModificationsFuture);
     }
 
     protected Future<ActorSelection> transformReadyReply(final Future<Object> readyReplyFuture) {
@@ -113,33 +107,31 @@ public class TransactionContextImpl extends AbstractTransactionContext {
             public ActorSelection checkedApply(Object serializedReadyReply) {
                 LOG.debug("Tx {} readyTransaction", getIdentifier());
 
-                // At this point the rwady operation succeeded and we need to extract the cohort
+                // At this point the ready operation succeeded and we need to extract the cohort
                 // actor path from the reply.
-                if (serializedReadyReply instanceof ReadyTransactionReply) {
-                    return actorContext.actorSelection(((ReadyTransactionReply)serializedReadyReply).getCohortPath());
-                } else if(serializedReadyReply instanceof BatchedModificationsReply) {
-                    return actorContext.actorSelection(((BatchedModificationsReply)serializedReadyReply).getCohortPath());
-                } else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
-                    ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
-                    String cohortPath = deserializeCohortPath(reply.getCohortPath());
-                    return actorContext.actorSelection(cohortPath);
-                } else {
-                    // Throwing an exception here will fail the Future.
-                    throw new IllegalArgumentException(String.format("%s: Invalid reply type %s",
-                        getIdentifier(), serializedReadyReply.getClass()));
+                if(ReadyTransactionReply.isSerializedType(serializedReadyReply)) {
+                    ReadyTransactionReply readyTxReply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
+                    return actorContext.actorSelection(extractCohortPathFrom(readyTxReply));
                 }
+
+                // Throwing an exception here will fail the Future.
+                throw new IllegalArgumentException(String.format("%s: Invalid reply type %s",
+                        getIdentifier(), serializedReadyReply.getClass()));
             }
         }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
     }
 
-    protected String deserializeCohortPath(String cohortPath) {
-        return cohortPath;
+    protected String extractCohortPathFrom(ReadyTransactionReply readyTxReply) {
+        return readyTxReply.getCohortPath();
+    }
+
+    private BatchedModifications newBatchedModifications() {
+        return new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion, transactionChainId);
     }
 
     private void batchModification(Modification modification) {
         if(batchedModifications == null) {
-            batchedModifications = new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion,
-                    transactionChainId);
+            batchedModifications = newBatchedModifications();
         }
 
         batchedModifications.addModification(modification);
@@ -156,7 +148,11 @@ public class TransactionContextImpl extends AbstractTransactionContext {
 
     protected Future<Object> sendBatchedModifications(boolean ready) {
         Future<Object> sent = null;
-        if(batchedModifications != null) {
+        if(ready || (batchedModifications != null && !batchedModifications.getModifications().isEmpty())) {
+            if(batchedModifications == null) {
+                batchedModifications = newBatchedModifications();
+            }
+
             if(LOG.isDebugEnabled()) {
                 LOG.debug("Tx {} sending {} batched modifications, ready: {}", getIdentifier(),
                         batchedModifications.getModifications().size(), ready);
@@ -165,8 +161,11 @@ public class TransactionContextImpl extends AbstractTransactionContext {
             batchedModifications.setReady(ready);
             sent = executeOperationAsync(batchedModifications);
 
-            batchedModifications = new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion,
-                    transactionChainId);
+            if(ready) {
+                batchedModifications = null;
+            } else {
+                batchedModifications = newBatchedModifications();
+            }
         }
 
         return sent;