Adjust Tx rate limiter for unused transactions
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
index 0bc82af3358c4747901fab005db5c19cf2d41288..f1ba4eabb999a6455688d93532877ae490df88df 100644 (file)
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
@@ -69,7 +70,19 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     public static enum TransactionType {
         READ_ONLY,
         WRITE_ONLY,
-        READ_WRITE
+        READ_WRITE;
+
+        public static TransactionType fromInt(int type) {
+            if(type == WRITE_ONLY.ordinal()) {
+                return WRITE_ONLY;
+            } else if(type == READ_WRITE.ordinal()) {
+                return READ_WRITE;
+            } else if(type == READ_ONLY.ordinal()) {
+                return READ_ONLY;
+            } else {
+                throw new IllegalArgumentException("In TransactionType enum value" + type);
+            }
+        }
     }
 
     static final Mapper<Throwable, Throwable> SAME_FAILURE_TRANSFORMER =
@@ -380,6 +393,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         if(txFutureCallbackMap.size() == 0) {
             onTransactionReady(Collections.<Future<ActorSelection>>emptyList());
+            TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
             return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
         }
 
@@ -425,18 +439,6 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     protected void onTransactionReady(List<Future<ActorSelection>> cohortFutures) {
     }
 
-    /**
-     * Method called to send a CreateTransaction message to a shard.
-     *
-     * @param shard the shard actor to send to
-     * @param serializedCreateMessage the serialized message to send
-     * @return the response Future
-     */
-    protected Future<Object> sendCreateTransaction(ActorSelection shard,
-            Object serializedCreateMessage) {
-        return actorContext.executeOperationAsync(shard, serializedCreateMessage);
-    }
-
     @Override
     public Object getIdentifier() {
         return this.identifier;
@@ -465,14 +467,17 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         return ShardStrategyFactory.getStrategy(path).findShard(path);
     }
 
+    protected Future<ActorSelection> sendFindPrimaryShardAsync(String shardName) {
+        return actorContext.findPrimaryShardAsync(shardName);
+    }
+
     private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) {
         String shardName = shardNameFromIdentifier(path);
         TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
         if(txFutureCallback == null) {
-            Future<ActorSelection> findPrimaryFuture = actorContext.findPrimaryShardAsync(shardName);
+            Future<ActorSelection> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
 
-            final TransactionFutureCallback newTxFutureCallback =
-                    new TransactionFutureCallback(shardName);
+            final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(shardName);
 
             txFutureCallback = newTxFutureCallback;
             txFutureCallbackMap.put(shardName, txFutureCallback);
@@ -481,7 +486,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 @Override
                 public void onComplete(Throwable failure, ActorSelection primaryShard) {
                     if(failure != null) {
-                        newTxFutureCallback.onComplete(failure, null);
+                        newTxFutureCallback.createTransactionContext(failure, null);
                     } else {
                         newTxFutureCallback.setPrimaryShard(primaryShard);
                     }
@@ -558,10 +563,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
          * Sets the target primary shard and initiates a CreateTransaction try.
          */
         void setPrimaryShard(ActorSelection primaryShard) {
-            LOG.debug("Tx {} Primary shard found - trying create transaction", identifier);
-
             this.primaryShard = primaryShard;
-            tryCreateTransaction();
+
+            if(transactionType == TransactionType.WRITE_ONLY &&
+                    actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
+                LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
+                        identifier, primaryShard);
+
+                // For write-only Tx's we prepare the transaction modifications directly on the shard actor
+                // to avoid the overhead of creating a separate transaction actor.
+                // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow.
+                executeTxOperatonsOnComplete(createValidTransactionContext(this.primaryShard,
+                        this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION));
+            } else {
+                tryCreateTransaction();
+            }
         }
 
         /**
@@ -571,7 +587,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             boolean invokeOperation = true;
             synchronized(txOperationsOnComplete) {
                 if(transactionContext == null) {
-                    LOG.debug("Tx {} Adding operation on complete {}", identifier);
+                    LOG.debug("Tx {} Adding operation on complete", identifier);
 
                     invokeOperation = false;
                     txOperationsOnComplete.add(operation);
@@ -598,10 +614,15 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
          * Performs a CreateTransaction try async.
          */
         private void tryCreateTransaction() {
-            Future<Object> createTxFuture = sendCreateTransaction(primaryShard,
-                    new CreateTransaction(identifier.toString(),
-                            TransactionProxy.this.transactionType.ordinal(),
-                            getTransactionChainId()).toSerializable());
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} Primary shard {} found - trying create transaction", identifier, primaryShard);
+            }
+
+            Object serializedCreateMessage = new CreateTransaction(identifier.toString(),
+                    TransactionProxy.this.transactionType.ordinal(),
+                    getTransactionChainId()).toSerializable();
+
+            Future<Object> createTxFuture = actorContext.executeOperationAsync(primaryShard, serializedCreateMessage);
 
             createTxFuture.onComplete(this, actorContext.getClientDispatcher());
         }
@@ -628,6 +649,10 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 }
             }
 
+            createTransactionContext(failure, response);
+        }
+
+        private void createTransactionContext(Throwable failure, Object response) {
             // Mainly checking for state violation here to perform a volatile read of "initialized" to
             // ensure updates to operationLimter et al are visible to this thread (ie we're doing
             // "piggy-back" synchronization here).
@@ -643,8 +668,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             // TransactionContext until after we've executed all cached TransactionOperations.
             TransactionContext localTransactionContext;
             if(failure != null) {
-                LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier,
-                        failure.getMessage());
+                LOG.debug("Tx {} Creating NoOpTransaction because of error", identifier, failure);
 
                 localTransactionContext = new NoOpTransactionContext(failure, identifier, operationLimiter);
             } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
@@ -694,11 +718,14 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         }
 
         private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
-            String transactionPath = reply.getTransactionPath();
-
             LOG.debug("Tx {} Received {}", identifier, reply);
 
-            ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
+            return createValidTransactionContext(actorContext.actorSelection(reply.getTransactionPath()),
+                    reply.getTransactionPath(), reply.getVersion());
+        }
+
+        private TransactionContext createValidTransactionContext(ActorSelection transactionActor,
+                String transactionPath, short remoteTransactionVersion) {
 
             if (transactionType == TransactionType.READ_ONLY) {
                 // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
@@ -727,12 +754,17 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             // Check if TxActor is created in the same node
             boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
 
-            if(reply.getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
-                return new TransactionContextImpl(transactionPath, transactionActor, identifier,
-                    actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
+            if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
+                return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, identifier,
+                        transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion,
+                        operationCompleter);
+            } else if (transactionType == TransactionType.WRITE_ONLY &&
+                    actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
+                return new WriteOnlyTransactionContextImpl(transactionActor, identifier, transactionChainId,
+                    actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
             } else {
-                return new LegacyTransactionContextImpl(transactionPath, transactionActor, identifier,
-                        actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
+                return new TransactionContextImpl(transactionActor, identifier, transactionChainId,
+                        actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
             }
         }
     }