BUG 3019 : Fix Operation throttling for modification batching scenarios
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / RemoteTransactionContextSupport.java
index 66562975cc8472af0ebda42d7b7e252b68861bb9..176073ef705cdcd38d714d395f07988fd060b4e9 100644 (file)
@@ -11,7 +11,6 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorSelection;
 import akka.dispatch.OnComplete;
 import com.google.common.base.Preconditions;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
@@ -75,8 +74,8 @@ final class RemoteTransactionContextSupport {
         return parent.getActorContext();
     }
 
-    private Semaphore getOperationLimiter() {
-        return parent.getLimiter();
+    private OperationLimiter getOperationLimiter() {
+        return transactionContextAdapter.getLimiter();
     }
 
     private TransactionIdentifier getIdentifier() {
@@ -86,19 +85,18 @@ final class RemoteTransactionContextSupport {
     /**
      * Sets the target primary shard and initiates a CreateTransaction try.
      */
-    void setPrimaryShard(ActorSelection primaryShard) {
+    void setPrimaryShard(ActorSelection primaryShard, short primaryVersion) {
         this.primaryShard = primaryShard;
 
-        if (getTransactionType() == TransactionType.WRITE_ONLY &&
+        if (getTransactionType() == TransactionType.WRITE_ONLY && primaryVersion >= DataStoreVersions.LITHIUM_VERSION &&
                 getActorContext().getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
             LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
                 getIdentifier(), primaryShard);
 
             // For write-only Tx's we prepare the transaction modifications directly on the shard actor
             // to avoid the overhead of creating a separate transaction actor.
-            // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow.
             transactionContextAdapter.executePriorTransactionOperations(createValidTransactionContext(this.primaryShard,
-                    this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION));
+                    this.primaryShard.path().toString(), primaryVersion));
         } else {
             tryCreateTransaction();
         }
@@ -162,7 +160,7 @@ final class RemoteTransactionContextSupport {
         if(failure != null) {
             LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure);
 
-            localTransactionContext = new NoOpTransactionContext(failure, getIdentifier(), getOperationLimiter());
+            localTransactionContext = new NoOpTransactionContext(failure, getIdentifier());
         } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) {
             localTransactionContext = createValidTransactionContext(
                     CreateTransactionReply.fromSerializable(response));
@@ -170,7 +168,7 @@ final class RemoteTransactionContextSupport {
             IllegalArgumentException exception = new IllegalArgumentException(String.format(
                     "Invalid reply type %s for CreateTransaction", response.getClass()));
 
-            localTransactionContext = new NoOpTransactionContext(exception, getIdentifier(), getOperationLimiter());
+            localTransactionContext = new NoOpTransactionContext(exception, getIdentifier());
         }
 
         transactionContextAdapter.executePriorTransactionOperations(localTransactionContext);
@@ -191,14 +189,17 @@ final class RemoteTransactionContextSupport {
         final TransactionContext ret;
 
         if (remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
-            ret = new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
-                getActorContext(), isTxActorLocal, remoteTransactionVersion, parent.getCompleter());
+            ret = new PreLithiumTransactionContextImpl(transactionContextAdapter.getIdentifier(), transactionPath, transactionActor,
+                getActorContext(), isTxActorLocal, remoteTransactionVersion, transactionContextAdapter.getLimiter());
         } else {
-            ret = new RemoteTransactionContext(transactionActor, getIdentifier(), getActorContext(),
-                isTxActorLocal, remoteTransactionVersion, parent.getCompleter());
+            ret = new RemoteTransactionContext(transactionContextAdapter.getIdentifier(), transactionActor, getActorContext(),
+                isTxActorLocal, remoteTransactionVersion, transactionContextAdapter.getLimiter());
+        }
+
+        if(parent.getType() == TransactionType.READ_ONLY) {
+            TransactionContextCleanup.track(this, ret);
         }
 
-        TransactionContextCleanup.track(this, ret);
         return ret;
     }
 }