Specialize TransactionContextWrapper
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DelayedTransactionContextWrapper.java
@@ -8,7 +8,6 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import static com.google.common.base.Preconditions.checkState;
-import static java.util.Objects.requireNonNull;
 
 import akka.actor.ActorSelection;
 import akka.dispatch.Futures;
@@ -19,8 +18,8 @@ import java.util.List;
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.SortedSet;
-import java.util.concurrent.TimeUnit;
 import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
 import org.slf4j.Logger;
@@ -29,23 +28,20 @@ import scala.concurrent.Future;
 import scala.concurrent.Promise;
 
 /**
- * A helper class that wraps an eventual TransactionContext instance. Operations destined for the target
- * TransactionContext instance are cached until the TransactionContext instance becomes available at which
- * time they are executed.
+ * Delayed implementation of TransactionContextWrapper. Operations destined for the target
+ * TransactionContext instance are cached until the TransactionContext instance becomes
+ * available at which time they are executed.
  *
  * @author Thomas Pantelis
  */
-class TransactionContextWrapper {
-    private static final Logger LOG = LoggerFactory.getLogger(TransactionContextWrapper.class);
+final class DelayedTransactionContextWrapper extends AbstractTransactionContextWrapper {
+    private static final Logger LOG = LoggerFactory.getLogger(DelayedTransactionContextWrapper.class);
 
     /**
      * The list of transaction operations to execute once the TransactionContext becomes available.
      */
     @GuardedBy("queuedTxOperations")
     private final List<Entry<TransactionOperation, Boolean>> queuedTxOperations = new ArrayList<>();
-    private final TransactionIdentifier identifier;
-    private final OperationLimiter limiter;
-    private final String shardName;
 
     /**
      * The resulting TransactionContext.
@@ -56,22 +52,45 @@ class TransactionContextWrapper {
     @GuardedBy("queuedTxOperations")
     private boolean pendingEnqueue;
 
-    TransactionContextWrapper(final TransactionIdentifier identifier, final ActorUtils actorUtils,
-            final String shardName) {
-        this.identifier = requireNonNull(identifier);
-        this.limiter = new OperationLimiter(identifier,
-                // 1 extra permit for the ready operation
-                actorUtils.getDatastoreContext().getShardBatchedModificationCount() + 1,
-                TimeUnit.MILLISECONDS.toSeconds(actorUtils.getDatastoreContext().getOperationTimeoutInMillis()));
-        this.shardName = requireNonNull(shardName);
+    DelayedTransactionContextWrapper(@NonNull final TransactionIdentifier identifier,
+                                     @NonNull final ActorUtils actorUtils, @NonNull final String shardName) {
+        super(identifier, actorUtils, shardName);
     }
 
+    @Override
     TransactionContext getTransactionContext() {
         return transactionContext;
     }
 
-    TransactionIdentifier getIdentifier() {
-        return identifier;
+    @Override
+    void maybeExecuteTransactionOperation(final TransactionOperation op) {
+        final TransactionContext localContext = transactionContext;
+        if (localContext != null) {
+            op.invoke(localContext, null);
+        } else {
+            // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
+            // callback to be executed after the Tx is created.
+            enqueueTransactionOperation(op);
+        }
+    }
+
+    @Override
+    Future<ActorSelection> readyTransaction(final Optional<SortedSet<String>> participatingShardNames) {
+        // avoid the creation of a promise and a TransactionOperation
+        final TransactionContext localContext = transactionContext;
+        if (localContext != null) {
+            return localContext.readyTransaction(null, participatingShardNames);
+        }
+
+        final Promise<ActorSelection> promise = Futures.promise();
+        enqueueTransactionOperation(new TransactionOperation() {
+            @Override
+            public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) {
+                promise.completeWith(newTransactionContext.readyTransaction(havePermit, participatingShardNames));
+            }
+        });
+
+        return promise.future();
     }
 
     /**
@@ -97,7 +116,7 @@ class TransactionContextWrapper {
         synchronized (queuedTxOperations) {
             contextOnEntry = transactionContext;
             if (contextOnEntry == null) {
-                checkState(pendingEnqueue == false, "Concurrent access to transaction %s detected", identifier);
+                checkState(pendingEnqueue == false, "Concurrent access to transaction %s detected", getIdentifier());
                 pendingEnqueue = true;
             }
         }
@@ -112,15 +131,15 @@ class TransactionContextWrapper {
         TransactionContext finishHandoff = null;
         try {
             // Acquire the permit,
-            final boolean havePermit = limiter.acquire();
+            final boolean havePermit = getLimiter().acquire();
             if (!havePermit) {
-                LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", identifier,
-                    shardName);
+                LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", getIdentifier(),
+                        getShardName());
             }
 
             // Ready to enqueue, take the lock again and append the operation
             synchronized (queuedTxOperations) {
-                LOG.debug("Tx {} Queuing TransactionOperation", identifier);
+                LOG.debug("Tx {} Queuing TransactionOperation", getIdentifier());
                 queuedTxOperations.add(new SimpleImmutableEntry<>(operation, havePermit));
                 pendingEnqueue = false;
                 cleanupEnqueue = false;
@@ -141,17 +160,6 @@ class TransactionContextWrapper {
         }
     }
 
-    void maybeExecuteTransactionOperation(final TransactionOperation op) {
-        final TransactionContext localContext = transactionContext;
-        if (localContext != null) {
-            op.invoke(localContext, null);
-        } else {
-            // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
-            // callback to be executed after the Tx is created.
-            enqueueTransactionOperation(op);
-        }
-    }
-
     void executePriorTransactionOperations(final TransactionContext localTransactionContext) {
         while (true) {
             // Access to queuedTxOperations and transactionContext must be protected and atomic
@@ -190,32 +198,11 @@ class TransactionContextWrapper {
                 if (permit.booleanValue() && !localTransactionContext.usesOperationLimiting()) {
                     // If the context is not using limiting we need to release operations as we are queueing them, so
                     // user threads are not charged for them.
-                    limiter.release();
+                    getLimiter().release();
                 }
                 oper.getKey().invoke(localTransactionContext, permit);
             }
         }
     }
 
-    Future<ActorSelection> readyTransaction(Optional<SortedSet<String>> participatingShardNames) {
-        // avoid the creation of a promise and a TransactionOperation
-        final TransactionContext localContext = transactionContext;
-        if (localContext != null) {
-            return localContext.readyTransaction(null, participatingShardNames);
-        }
-
-        final Promise<ActorSelection> promise = Futures.promise();
-        enqueueTransactionOperation(new TransactionOperation() {
-            @Override
-            public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) {
-                promise.completeWith(newTransactionContext.readyTransaction(havePermit, participatingShardNames));
-            }
-        });
-
-        return promise.future();
-    }
-
-    OperationLimiter getLimiter() {
-        return limiter;
-    }
 }