Adjust Tx rate limiter for unused transactions
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
index 64b9086c250c16f759a417003d0cefc9839f688b..f1ba4eabb999a6455688d93532877ae490df88df 100644 (file)
@@ -70,7 +70,19 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     public static enum TransactionType {
         READ_ONLY,
         WRITE_ONLY,
-        READ_WRITE
+        READ_WRITE;
+
+        public static TransactionType fromInt(int type) {
+            if(type == WRITE_ONLY.ordinal()) {
+                return WRITE_ONLY;
+            } else if(type == READ_WRITE.ordinal()) {
+                return READ_WRITE;
+            } else if(type == READ_ONLY.ordinal()) {
+                return READ_ONLY;
+            } else {
+                throw new IllegalArgumentException("In TransactionType enum value" + type);
+            }
+        }
     }
 
     static final Mapper<Throwable, Throwable> SAME_FAILURE_TRANSFORMER =
@@ -381,6 +393,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         if(txFutureCallbackMap.size() == 0) {
             onTransactionReady(Collections.<Future<ActorSelection>>emptyList());
+            TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
             return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
         }
 
@@ -473,7 +486,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 @Override
                 public void onComplete(Throwable failure, ActorSelection primaryShard) {
                     if(failure != null) {
-                        newTxFutureCallback.onComplete(failure, null);
+                        newTxFutureCallback.createTransactionContext(failure, null);
                     } else {
                         newTxFutureCallback.setPrimaryShard(primaryShard);
                     }
@@ -550,10 +563,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
          * Sets the target primary shard and initiates a CreateTransaction try.
          */
         void setPrimaryShard(ActorSelection primaryShard) {
-            LOG.debug("Tx {} Primary shard found - trying create transaction", identifier);
-
             this.primaryShard = primaryShard;
-            tryCreateTransaction();
+
+            if(transactionType == TransactionType.WRITE_ONLY &&
+                    actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
+                LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
+                        identifier, primaryShard);
+
+                // For write-only Tx's we prepare the transaction modifications directly on the shard actor
+                // to avoid the overhead of creating a separate transaction actor.
+                // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow.
+                executeTxOperatonsOnComplete(createValidTransactionContext(this.primaryShard,
+                        this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION));
+            } else {
+                tryCreateTransaction();
+            }
         }
 
         /**
@@ -563,7 +587,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             boolean invokeOperation = true;
             synchronized(txOperationsOnComplete) {
                 if(transactionContext == null) {
-                    LOG.debug("Tx {} Adding operation on complete {}", identifier);
+                    LOG.debug("Tx {} Adding operation on complete", identifier);
 
                     invokeOperation = false;
                     txOperationsOnComplete.add(operation);
@@ -590,6 +614,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
          * Performs a CreateTransaction try async.
          */
         private void tryCreateTransaction() {
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} Primary shard {} found - trying create transaction", identifier, primaryShard);
+            }
+
             Object serializedCreateMessage = new CreateTransaction(identifier.toString(),
                     TransactionProxy.this.transactionType.ordinal(),
                     getTransactionChainId()).toSerializable();
@@ -621,6 +649,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 }
             }
 
+            createTransactionContext(failure, response);
+        }
+
+        private void createTransactionContext(Throwable failure, Object response) {
             // Mainly checking for state violation here to perform a volatile read of "initialized" to
             // ensure updates to operationLimter et al are visible to this thread (ie we're doing
             // "piggy-back" synchronization here).
@@ -636,8 +668,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             // TransactionContext until after we've executed all cached TransactionOperations.
             TransactionContext localTransactionContext;
             if(failure != null) {
-                LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier,
-                        failure.getMessage());
+                LOG.debug("Tx {} Creating NoOpTransaction because of error", identifier, failure);
 
                 localTransactionContext = new NoOpTransactionContext(failure, identifier, operationLimiter);
             } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
@@ -687,11 +718,14 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         }
 
         private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
-            String transactionPath = reply.getTransactionPath();
-
             LOG.debug("Tx {} Received {}", identifier, reply);
 
-            ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
+            return createValidTransactionContext(actorContext.actorSelection(reply.getTransactionPath()),
+                    reply.getTransactionPath(), reply.getVersion());
+        }
+
+        private TransactionContext createValidTransactionContext(ActorSelection transactionActor,
+                String transactionPath, short remoteTransactionVersion) {
 
             if (transactionType == TransactionType.READ_ONLY) {
                 // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
@@ -720,12 +754,17 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             // Check if TxActor is created in the same node
             boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
 
-            if(reply.getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
-                return new TransactionContextImpl(transactionPath, transactionActor, identifier,
-                    actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
-            } else {
+            if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
                 return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, identifier,
-                        actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
+                        transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion,
+                        operationCompleter);
+            } else if (transactionType == TransactionType.WRITE_ONLY &&
+                    actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
+                return new WriteOnlyTransactionContextImpl(transactionActor, identifier, transactionChainId,
+                    actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
+            } else {
+                return new TransactionContextImpl(transactionActor, identifier, transactionChainId,
+                        actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
             }
         }
     }