Raise EOS unsuccessful request reporting to error
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionContextWrapper.java
index 0e1260962d37d807f53233729f3f999e996faed9..2facfbd1a20a29a7d65719c4fba31ee0c4875977 100644 (file)
@@ -7,18 +7,22 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
 import akka.actor.ActorSelection;
 import akka.dispatch.Futures;
-import com.google.common.base.Preconditions;
 import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.SortedSet;
 import java.util.concurrent.TimeUnit;
-import javax.annotation.concurrent.GuardedBy;
+import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
@@ -52,14 +56,14 @@ class TransactionContextWrapper {
     @GuardedBy("queuedTxOperations")
     private boolean pendingEnqueue;
 
-    TransactionContextWrapper(final TransactionIdentifier identifier, final ActorContext actorContext,
+    TransactionContextWrapper(final TransactionIdentifier identifier, final ActorUtils actorUtils,
             final String shardName) {
-        this.identifier = Preconditions.checkNotNull(identifier);
+        this.identifier = requireNonNull(identifier);
         this.limiter = new OperationLimiter(identifier,
                 // 1 extra permit for the ready operation
-                actorContext.getDatastoreContext().getShardBatchedModificationCount() + 1,
-                TimeUnit.MILLISECONDS.toSeconds(actorContext.getDatastoreContext().getOperationTimeoutInMillis()));
-        this.shardName = Preconditions.checkNotNull(shardName);
+                actorUtils.getDatastoreContext().getShardBatchedModificationCount() + 1,
+                TimeUnit.MILLISECONDS.toSeconds(actorUtils.getDatastoreContext().getOperationTimeoutInMillis()));
+        this.shardName = requireNonNull(shardName);
     }
 
     TransactionContext getTransactionContext() {
@@ -93,8 +97,7 @@ class TransactionContextWrapper {
         synchronized (queuedTxOperations) {
             contextOnEntry = transactionContext;
             if (contextOnEntry == null) {
-                Preconditions.checkState(pendingEnqueue == false, "Concurrent access to transaction %s detected",
-                        identifier);
+                checkState(pendingEnqueue == false, "Concurrent access to transaction %s detected", identifier);
                 pendingEnqueue = true;
             }
         }
@@ -166,9 +169,6 @@ class TransactionContextWrapper {
                     if (!pendingEnqueue) {
                         // We're done invoking the TransactionOperations so we can now publish the TransactionContext.
                         localTransactionContext.operationHandOffComplete();
-                        if (!localTransactionContext.usesOperationLimiting()) {
-                            limiter.releaseAll();
-                        }
 
                         // This is null-to-non-null transition after which we are releasing the lock and not doing
                         // any further processing.
@@ -183,27 +183,32 @@ class TransactionContextWrapper {
                 queuedTxOperations.clear();
             }
 
-            // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
-            // A slight down-side is that we need to re-acquire the lock below but this should
-            // be negligible.
+            // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking. A slight down-side is
+            // that we need to re-acquire the lock below but this should be negligible.
             for (Entry<TransactionOperation, Boolean> oper : operationsBatch) {
-                oper.getKey().invoke(localTransactionContext, oper.getValue());
+                final Boolean permit = oper.getValue();
+                if (permit.booleanValue() && !localTransactionContext.usesOperationLimiting()) {
+                    // If the context is not using limiting we need to release operations as we are queueing them, so
+                    // user threads are not charged for them.
+                    limiter.release();
+                }
+                oper.getKey().invoke(localTransactionContext, permit);
             }
         }
     }
 
-    Future<ActorSelection> readyTransaction() {
+    Future<ActorSelection> readyTransaction(Optional<SortedSet<String>> participatingShardNames) {
         // avoid the creation of a promise and a TransactionOperation
         final TransactionContext localContext = transactionContext;
         if (localContext != null) {
-            return localContext.readyTransaction(null);
+            return localContext.readyTransaction(null, participatingShardNames);
         }
 
         final Promise<ActorSelection> promise = Futures.promise();
         enqueueTransactionOperation(new TransactionOperation() {
             @Override
             public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) {
-                promise.completeWith(newTransactionContext.readyTransaction(havePermit));
+                promise.completeWith(newTransactionContext.readyTransaction(havePermit, participatingShardNames));
             }
         });