BUG 3019 : Fix Operation throttling for modification batching scenarios
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / RemoteTransactionContext.java
index a25ddc873327accdc31995b160409025b19ce337..20074c10289908d20839fb2fbbd43e0089a7d24c 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorSelection;
 import akka.dispatch.OnComplete;
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.SettableFuture;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
@@ -45,28 +46,27 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     private final ActorSelection actor;
     private final boolean isTxActorLocal;
     private final short remoteTransactionVersion;
+    private final OperationLimiter limiter;
 
-    private final OperationCompleter operationCompleter;
     private BatchedModifications batchedModifications;
     private int totalBatchedModificationsSent;
 
-    protected RemoteTransactionContext(ActorSelection actor, TransactionIdentifier identifier,
+    protected RemoteTransactionContext(TransactionIdentifier identifier, ActorSelection actor,
             ActorContext actorContext, boolean isTxActorLocal,
-            short remoteTransactionVersion, OperationCompleter operationCompleter) {
+            short remoteTransactionVersion, OperationLimiter limiter) {
         super(identifier);
+        this.limiter = Preconditions.checkNotNull(limiter);
         this.actor = actor;
         this.actorContext = actorContext;
         this.isTxActorLocal = isTxActorLocal;
         this.remoteTransactionVersion = remoteTransactionVersion;
-        this.operationCompleter = operationCompleter;
     }
 
     private Future<Object> completeOperation(Future<Object> operationFuture){
-        operationFuture.onComplete(this.operationCompleter, actorContext.getClientDispatcher());
+        operationFuture.onComplete(limiter, actorContext.getClientDispatcher());
         return operationFuture;
     }
 
-
     private ActorSelection getActor() {
         return actor;
     }
@@ -107,6 +107,8 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
 
     @Override
     public Future<ActorSelection> readyTransaction() {
+        logModificationCount();
+
         LOG.debug("Tx {} readyTransaction called", getIdentifier());
 
         // Send the remaining batched modifications, if any, with the ready flag set.
@@ -128,6 +130,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     }
 
     private void batchModification(Modification modification) {
+        incrementModificationCount();
         if(batchedModifications == null) {
             batchedModifications = newBatchedModifications();
         }
@@ -175,6 +178,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     public void deleteData(YangInstanceIdentifier path) {
         LOG.debug("Tx {} deleteData called path = {}", getIdentifier(), path);
 
+        acquireOperation();
         batchModification(new DeleteModification(path));
     }
 
@@ -182,6 +186,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
         LOG.debug("Tx {} mergeData called path = {}", getIdentifier(), path);
 
+        acquireOperation();
         batchModification(new MergeModification(path, data));
     }
 
@@ -189,6 +194,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
         LOG.debug("Tx {} writeData called path = {}", getIdentifier(), path);
 
+        acquireOperation();
         batchModification(new WriteModification(path, data));
     }
 
@@ -201,6 +207,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
         // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
         // public API contract.
 
+        acquireOperation();
         sendBatchedModifications();
 
         OnComplete<Object> onComplete = new OnComplete<Object>() {
@@ -243,6 +250,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
         // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
         // public API contract.
 
+        acquireOperation();
         sendBatchedModifications();
 
         OnComplete<Object> onComplete = new OnComplete<Object>() {
@@ -273,4 +281,19 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
 
         future.onComplete(onComplete, actorContext.getClientDispatcher());
     }
+
+    /**
+     * Acquire operation from the limiter if the hand-off has completed. If
+     * the hand-off is still ongoing, this method does nothing.
+     */
+    private final void acquireOperation() {
+        if (isOperationHandOffComplete()) {
+            limiter.acquire();
+        }
+    }
+
+    @Override
+    public boolean usesOperationLimiting() {
+        return true;
+    }
 }