Verify BatchedModifications messages sent vs received
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionContextImpl.java
index b9900889b1125e8ac229f89af9b4f64551209b8b..c722918c5cfed8ad7062e63911ae60fa45aad7fa 100644 (file)
@@ -1,5 +1,6 @@
 /*
  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -14,13 +15,11 @@ import com.google.common.base.Optional;
 import com.google.common.util.concurrent.SettableFuture;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
@@ -47,6 +46,7 @@ public class TransactionContextImpl extends AbstractTransactionContext {
 
     private final OperationCompleter operationCompleter;
     private BatchedModifications batchedModifications;
+    private int totalBatchedModificationsSent;
 
     protected TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier,
             String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
@@ -93,15 +93,11 @@ public class TransactionContextImpl extends AbstractTransactionContext {
     public Future<ActorSelection> readyTransaction() {
         LOG.debug("Tx {} readyTransaction called", getIdentifier());
 
-        // Send the remaining batched modifications if any.
+        // Send the remaining batched modifications, if any, with the ready flag set.
 
-        sendBatchedModifications();
-
-        // Send the ReadyTransaction message to the Tx actor.
-
-        Future<Object> readyReplyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
+        Future<Object> lastModificationsFuture = sendBatchedModifications(true);
 
-        return transformReadyReply(readyReplyFuture);
+        return transformReadyReply(lastModificationsFuture);
     }
 
     protected Future<ActorSelection> transformReadyReply(final Future<Object> readyReplyFuture) {
@@ -113,33 +109,31 @@ public class TransactionContextImpl extends AbstractTransactionContext {
             public ActorSelection checkedApply(Object serializedReadyReply) {
                 LOG.debug("Tx {} readyTransaction", getIdentifier());
 
-                // At this point the rwady operation succeeded and we need to extract the cohort
+                // At this point the ready operation succeeded and we need to extract the cohort
                 // actor path from the reply.
-                if (serializedReadyReply instanceof ReadyTransactionReply) {
-                    return actorContext.actorSelection(((ReadyTransactionReply)serializedReadyReply).getCohortPath());
-                } else if(serializedReadyReply instanceof BatchedModificationsReply) {
-                    return actorContext.actorSelection(((BatchedModificationsReply)serializedReadyReply).getCohortPath());
-                } else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
-                    ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
-                    String cohortPath = deserializeCohortPath(reply.getCohortPath());
-                    return actorContext.actorSelection(cohortPath);
-                } else {
-                    // Throwing an exception here will fail the Future.
-                    throw new IllegalArgumentException(String.format("%s: Invalid reply type %s",
-                        getIdentifier(), serializedReadyReply.getClass()));
+                if(ReadyTransactionReply.isSerializedType(serializedReadyReply)) {
+                    ReadyTransactionReply readyTxReply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
+                    return actorContext.actorSelection(extractCohortPathFrom(readyTxReply));
                 }
+
+                // Throwing an exception here will fail the Future.
+                throw new IllegalArgumentException(String.format("%s: Invalid reply type %s",
+                        getIdentifier(), serializedReadyReply.getClass()));
             }
         }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
     }
 
-    protected String deserializeCohortPath(String cohortPath) {
-        return cohortPath;
+    protected String extractCohortPathFrom(ReadyTransactionReply readyTxReply) {
+        return readyTxReply.getCohortPath();
+    }
+
+    private BatchedModifications newBatchedModifications() {
+        return new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion, transactionChainId);
     }
 
     private void batchModification(Modification modification) {
         if(batchedModifications == null) {
-            batchedModifications = new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion,
-                    transactionChainId);
+            batchedModifications = newBatchedModifications();
         }
 
         batchedModifications.addModification(modification);
@@ -156,17 +150,25 @@ public class TransactionContextImpl extends AbstractTransactionContext {
 
     protected Future<Object> sendBatchedModifications(boolean ready) {
         Future<Object> sent = null;
-        if(batchedModifications != null) {
+        if(ready || (batchedModifications != null && !batchedModifications.getModifications().isEmpty())) {
+            if(batchedModifications == null) {
+                batchedModifications = newBatchedModifications();
+            }
+
             if(LOG.isDebugEnabled()) {
                 LOG.debug("Tx {} sending {} batched modifications, ready: {}", getIdentifier(),
                         batchedModifications.getModifications().size(), ready);
             }
 
             batchedModifications.setReady(ready);
+            batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent);
             sent = executeOperationAsync(batchedModifications);
 
-            batchedModifications = new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion,
-                    transactionChainId);
+            if(ready) {
+                batchedModifications = null;
+            } else {
+                batchedModifications = newBatchedModifications();
+            }
         }
 
         return sent;