Specialize TransactionContextWrapper 64/90864/56
authortadei.bilan <tadei.bilan@pantheon.tech>
Thu, 2 Jul 2020 10:48:54 +0000 (13:48 +0300)
committerRobert Varga <robert.varga@pantheon.tech>
Tue, 21 Sep 2021 19:07:31 +0000 (21:07 +0200)
Most of the time we are talking to local leader, in which case
we do not need to queue messages and bounce them through the queue.

JIRA: CONTROLLER-1952
Change-Id: I07d85c82c2ab6e4251c70b2e6d1dafa2dc455d39
Signed-off-by: tadei.bilan <tadei.bilan@pantheon.tech>
Signed-off-by: Ivan Hrasko <ivan.hrasko@pantheon.tech>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextWrapper.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedTransactionContextWrapper.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java with 84% similarity]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DirectTransactionContextWrapper.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DelayedTransactionContextWrapperTest.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapperTest.java with 82% similarity]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DirectTransactionContextWrapperTest.java [new file with mode: 0644]

index 6d573dedf53223b34c8c2ca525bea3435d9bc812..211ebec63c3074e9199d9c034290928495451b91 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.controller.cluster.datastore;
 import static java.util.Objects.requireNonNull;
 
 import akka.actor.ActorSelection;
-import akka.dispatch.OnComplete;
 import java.util.Collection;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
@@ -80,20 +79,48 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
         return null;
     }
 
-    private void onFindPrimaryShardSuccess(PrimaryShardInfo primaryShardInfo, TransactionProxy parent,
-            String shardName, TransactionContextWrapper transactionContextWrapper) {
+    private AbstractTransactionContextWrapper maybeCreateDirectTransactionContextWrapper(
+            final PrimaryShardInfo primaryShardInfo, final TransactionProxy parent,
+            final String shardName, final DelayedTransactionContextWrapper transactionContextWrapper) {
+        LOG.debug("Tx {}: Found primary {} for shard {}, trying to use DirectTransactionContextWrapper",
+                parent.getIdentifier(), primaryShardInfo.getPrimaryShardActor(), shardName);
+
+        updateShardInfo(shardName, primaryShardInfo);
+
+        final TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
+        try {
+            if (localContext != null) {
+                LOG.debug("Tx {}: Local transaction context created successfully, using DirectTransactionWrapper",
+                        parent.getIdentifier());
+                return new DirectTransactionContextWrapper(parent.getIdentifier(), actorUtils, shardName,
+                        localContext);
+            } else {
+                LOG.debug("Tx {}: Local transaction context creation failed, using DelayedTransactionWrapper",
+                        parent.getIdentifier());
+                final RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(
+                        transactionContextWrapper, parent, shardName);
+                remote.setPrimaryShard(primaryShardInfo);
+                return transactionContextWrapper;
+            }
+        } finally {
+            onTransactionContextCreated(parent.getIdentifier());
+        }
+    }
+
+    private void onFindPrimaryShardSuccess(final PrimaryShardInfo primaryShardInfo, final TransactionProxy parent,
+            final String shardName, final DelayedTransactionContextWrapper transactionContextWrapper) {
         LOG.debug("Tx {}: Found primary {} for shard {}", parent.getIdentifier(),
                 primaryShardInfo.getPrimaryShardActor(), shardName);
 
         updateShardInfo(shardName, primaryShardInfo);
 
+        final TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
         try {
-            TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
             if (localContext != null) {
                 transactionContextWrapper.executePriorTransactionOperations(localContext);
             } else {
-                RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(transactionContextWrapper,
-                        parent, shardName);
+                final RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(
+                        transactionContextWrapper, parent, shardName);
                 remote.setPrimaryShard(primaryShardInfo);
             }
         } finally {
@@ -101,45 +128,43 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
         }
     }
 
-    private void onFindPrimaryShardFailure(Throwable failure, TransactionProxy parent,
-            String shardName, TransactionContextWrapper transactionContextWrapper) {
+    private void onFindPrimaryShardFailure(final Throwable failure, final TransactionProxy parent,
+            final String shardName, final DelayedTransactionContextWrapper transactionContextWrapper) {
         LOG.debug("Tx {}: Find primary for shard {} failed", parent.getIdentifier(), shardName, failure);
 
         try {
-            transactionContextWrapper.executePriorTransactionOperations(new NoOpTransactionContext(failure,
-                    parent.getIdentifier()));
+            transactionContextWrapper.executePriorTransactionOperations(
+                    new NoOpTransactionContext(failure, parent.getIdentifier()));
         } finally {
             onTransactionContextCreated(parent.getIdentifier());
         }
     }
 
-    final TransactionContextWrapper newTransactionContextWrapper(final TransactionProxy parent,
+    final AbstractTransactionContextWrapper newTransactionContextWrapper(final TransactionProxy parent,
             final String shardName) {
-        final TransactionContextWrapper transactionContextWrapper =
-                new TransactionContextWrapper(parent.getIdentifier(), actorUtils, shardName);
-
-        Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName, parent.getIdentifier());
+        final DelayedTransactionContextWrapper contextWrapper = new DelayedTransactionContextWrapper(
+                parent.getIdentifier(), actorUtils, shardName);
+        final Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName, parent.getIdentifier());
         if (findPrimaryFuture.isCompleted()) {
-            Try<PrimaryShardInfo> maybe = findPrimaryFuture.value().get();
+            final Try<PrimaryShardInfo> maybe = findPrimaryFuture.value().get();
             if (maybe.isSuccess()) {
-                onFindPrimaryShardSuccess(maybe.get(), parent, shardName, transactionContextWrapper);
+                return maybeCreateDirectTransactionContextWrapper(maybe.get(), parent, shardName,
+                        contextWrapper);
             } else {
-                onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName, transactionContextWrapper);
+                onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName,
+                        contextWrapper);
             }
         } else {
-            findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
-                @Override
-                public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
-                    if (failure == null) {
-                        onFindPrimaryShardSuccess(primaryShardInfo, parent, shardName, transactionContextWrapper);
-                    } else {
-                        onFindPrimaryShardFailure(failure, parent, shardName, transactionContextWrapper);
-                    }
+            findPrimaryFuture.onComplete((result) -> {
+                if (result.isSuccess()) {
+                    onFindPrimaryShardSuccess(result.get(), parent, shardName, contextWrapper);
+                } else {
+                    onFindPrimaryShardFailure(result.failed().get(), parent, shardName, contextWrapper);
                 }
+                return null;
             }, actorUtils.getClientDispatcher());
         }
-
-        return transactionContextWrapper;
+        return contextWrapper;
     }
 
     private void updateShardInfo(final String shardName, final PrimaryShardInfo primaryShardInfo) {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextWrapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextWrapper.java
new file mode 100644 (file)
index 0000000..49dac87
--- /dev/null
@@ -0,0 +1,72 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.ActorSelection;
+import java.util.Optional;
+import java.util.SortedSet;
+import java.util.concurrent.TimeUnit;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
+import scala.concurrent.Future;
+
+/**
+ * A helper class that wraps an eventual TransactionContext instance. We have two specializations:
+ * <ul>
+ *   <li>{@link DelayedTransactionContextWrapper}, which enqueues operations towards the backend</li>
+ *   <li>{@link DirectTransactionContextWrapper}, which sends operations to the backend</li>
+ * </ul>
+ */
+abstract class AbstractTransactionContextWrapper {
+    private final TransactionIdentifier identifier;
+    private final OperationLimiter limiter;
+    private final String shardName;
+
+    AbstractTransactionContextWrapper(@NonNull final TransactionIdentifier identifier,
+                                      @NonNull final ActorUtils actorUtils, @NonNull final String shardName) {
+        this.identifier = requireNonNull(identifier);
+        this.shardName = requireNonNull(shardName);
+        limiter = new OperationLimiter(identifier,
+            // 1 extra permit for the ready operation
+            actorUtils.getDatastoreContext().getShardBatchedModificationCount() + 1,
+            TimeUnit.MILLISECONDS.toSeconds(actorUtils.getDatastoreContext().getOperationTimeoutInMillis()));
+    }
+
+    final TransactionIdentifier getIdentifier() {
+        return identifier;
+    }
+
+    final OperationLimiter getLimiter() {
+        return limiter;
+    }
+
+    final String getShardName() {
+        return shardName;
+    }
+
+    abstract @Nullable TransactionContext getTransactionContext();
+
+    /**
+     * Either enqueue or execute specified operation.
+     *
+     * @param op Operation to (eventually) execute
+     */
+    abstract void maybeExecuteTransactionOperation(TransactionOperation op);
+
+    /**
+     * Mark the transaction as ready.
+     *
+     * @param participatingShardNames Shards which participate on the transaction
+     * @return Future indicating the transaction has been readied on the backend
+     */
+    abstract @NonNull Future<ActorSelection> readyTransaction(Optional<SortedSet<String>> participatingShardNames);
+}
@@ -8,7 +8,6 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import static com.google.common.base.Preconditions.checkState;
-import static java.util.Objects.requireNonNull;
 
 import akka.actor.ActorSelection;
 import akka.dispatch.Futures;
@@ -19,8 +18,8 @@ import java.util.List;
 import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.SortedSet;
-import java.util.concurrent.TimeUnit;
 import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
 import org.slf4j.Logger;
@@ -29,23 +28,20 @@ import scala.concurrent.Future;
 import scala.concurrent.Promise;
 
 /**
- * A helper class that wraps an eventual TransactionContext instance. Operations destined for the target
- * TransactionContext instance are cached until the TransactionContext instance becomes available at which
- * time they are executed.
+ * Delayed implementation of TransactionContextWrapper. Operations destined for the target
+ * TransactionContext instance are cached until the TransactionContext instance becomes
+ * available at which time they are executed.
  *
  * @author Thomas Pantelis
  */
-class TransactionContextWrapper {
-    private static final Logger LOG = LoggerFactory.getLogger(TransactionContextWrapper.class);
+final class DelayedTransactionContextWrapper extends AbstractTransactionContextWrapper {
+    private static final Logger LOG = LoggerFactory.getLogger(DelayedTransactionContextWrapper.class);
 
     /**
      * The list of transaction operations to execute once the TransactionContext becomes available.
      */
     @GuardedBy("queuedTxOperations")
     private final List<Entry<TransactionOperation, Boolean>> queuedTxOperations = new ArrayList<>();
-    private final TransactionIdentifier identifier;
-    private final OperationLimiter limiter;
-    private final String shardName;
 
     /**
      * The resulting TransactionContext.
@@ -56,22 +52,45 @@ class TransactionContextWrapper {
     @GuardedBy("queuedTxOperations")
     private boolean pendingEnqueue;
 
-    TransactionContextWrapper(final TransactionIdentifier identifier, final ActorUtils actorUtils,
-            final String shardName) {
-        this.identifier = requireNonNull(identifier);
-        this.limiter = new OperationLimiter(identifier,
-                // 1 extra permit for the ready operation
-                actorUtils.getDatastoreContext().getShardBatchedModificationCount() + 1,
-                TimeUnit.MILLISECONDS.toSeconds(actorUtils.getDatastoreContext().getOperationTimeoutInMillis()));
-        this.shardName = requireNonNull(shardName);
+    DelayedTransactionContextWrapper(@NonNull final TransactionIdentifier identifier,
+                                     @NonNull final ActorUtils actorUtils, @NonNull final String shardName) {
+        super(identifier, actorUtils, shardName);
     }
 
+    @Override
     TransactionContext getTransactionContext() {
         return transactionContext;
     }
 
-    TransactionIdentifier getIdentifier() {
-        return identifier;
+    @Override
+    void maybeExecuteTransactionOperation(final TransactionOperation op) {
+        final TransactionContext localContext = transactionContext;
+        if (localContext != null) {
+            op.invoke(localContext, null);
+        } else {
+            // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
+            // callback to be executed after the Tx is created.
+            enqueueTransactionOperation(op);
+        }
+    }
+
+    @Override
+    Future<ActorSelection> readyTransaction(final Optional<SortedSet<String>> participatingShardNames) {
+        // avoid the creation of a promise and a TransactionOperation
+        final TransactionContext localContext = transactionContext;
+        if (localContext != null) {
+            return localContext.readyTransaction(null, participatingShardNames);
+        }
+
+        final Promise<ActorSelection> promise = Futures.promise();
+        enqueueTransactionOperation(new TransactionOperation() {
+            @Override
+            public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) {
+                promise.completeWith(newTransactionContext.readyTransaction(havePermit, participatingShardNames));
+            }
+        });
+
+        return promise.future();
     }
 
     /**
@@ -97,7 +116,7 @@ class TransactionContextWrapper {
         synchronized (queuedTxOperations) {
             contextOnEntry = transactionContext;
             if (contextOnEntry == null) {
-                checkState(pendingEnqueue == false, "Concurrent access to transaction %s detected", identifier);
+                checkState(pendingEnqueue == false, "Concurrent access to transaction %s detected", getIdentifier());
                 pendingEnqueue = true;
             }
         }
@@ -112,15 +131,15 @@ class TransactionContextWrapper {
         TransactionContext finishHandoff = null;
         try {
             // Acquire the permit,
-            final boolean havePermit = limiter.acquire();
+            final boolean havePermit = getLimiter().acquire();
             if (!havePermit) {
-                LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", identifier,
-                    shardName);
+                LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", getIdentifier(),
+                        getShardName());
             }
 
             // Ready to enqueue, take the lock again and append the operation
             synchronized (queuedTxOperations) {
-                LOG.debug("Tx {} Queuing TransactionOperation", identifier);
+                LOG.debug("Tx {} Queuing TransactionOperation", getIdentifier());
                 queuedTxOperations.add(new SimpleImmutableEntry<>(operation, havePermit));
                 pendingEnqueue = false;
                 cleanupEnqueue = false;
@@ -141,17 +160,6 @@ class TransactionContextWrapper {
         }
     }
 
-    void maybeExecuteTransactionOperation(final TransactionOperation op) {
-        final TransactionContext localContext = transactionContext;
-        if (localContext != null) {
-            op.invoke(localContext, null);
-        } else {
-            // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
-            // callback to be executed after the Tx is created.
-            enqueueTransactionOperation(op);
-        }
-    }
-
     void executePriorTransactionOperations(final TransactionContext localTransactionContext) {
         while (true) {
             // Access to queuedTxOperations and transactionContext must be protected and atomic
@@ -190,32 +198,11 @@ class TransactionContextWrapper {
                 if (permit.booleanValue() && !localTransactionContext.usesOperationLimiting()) {
                     // If the context is not using limiting we need to release operations as we are queueing them, so
                     // user threads are not charged for them.
-                    limiter.release();
+                    getLimiter().release();
                 }
                 oper.getKey().invoke(localTransactionContext, permit);
             }
         }
     }
 
-    Future<ActorSelection> readyTransaction(Optional<SortedSet<String>> participatingShardNames) {
-        // avoid the creation of a promise and a TransactionOperation
-        final TransactionContext localContext = transactionContext;
-        if (localContext != null) {
-            return localContext.readyTransaction(null, participatingShardNames);
-        }
-
-        final Promise<ActorSelection> promise = Futures.promise();
-        enqueueTransactionOperation(new TransactionOperation() {
-            @Override
-            public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) {
-                promise.completeWith(newTransactionContext.readyTransaction(havePermit, participatingShardNames));
-            }
-        });
-
-        return promise.future();
-    }
-
-    OperationLimiter getLimiter() {
-        return limiter;
-    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DirectTransactionContextWrapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DirectTransactionContextWrapper.java
new file mode 100644 (file)
index 0000000..f004088
--- /dev/null
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.ActorSelection;
+import java.util.Optional;
+import java.util.SortedSet;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
+import scala.concurrent.Future;
+
+/**
+ * Direct implementation of TransactionContextWrapper. Operation are executed directly on TransactionContext. Always
+ * has completed context and executes on local shard.
+ */
+final class DirectTransactionContextWrapper extends AbstractTransactionContextWrapper {
+    private final TransactionContext transactionContext;
+
+    DirectTransactionContextWrapper(@NonNull final TransactionIdentifier identifier,
+                                    @NonNull final ActorUtils actorUtils,
+                                    @NonNull final String shardName,
+                                    @NonNull final TransactionContext transactionContext) {
+        super(identifier, actorUtils, shardName);
+        this.transactionContext = requireNonNull(transactionContext);
+    }
+
+    @Override
+    TransactionContext getTransactionContext() {
+        return transactionContext;
+    }
+
+    @Override
+    void maybeExecuteTransactionOperation(final TransactionOperation op) {
+        op.invoke(transactionContext, null);
+    }
+
+    @Override
+    Future<ActorSelection> readyTransaction(final Optional<SortedSet<String>> participatingShardNames) {
+        return transactionContext.readyTransaction(null, participatingShardNames);
+    }
+}
index 4276f3be52960e0ff9c68cd50adc5d9e591c4db6..333d11b4f05ce03e4bca41e1f7727dc5e31fe279 100644 (file)
@@ -35,7 +35,7 @@ import scala.concurrent.duration.FiniteDuration;
  * <p/>
  * The end result from a completed CreateTransaction message is a TransactionContext that is
  * used to perform transaction operations. Transaction operations that occur before the
- * CreateTransaction completes are cache via a TransactionContextWrapper and executed once the
+ * CreateTransaction completes are cached via a DelayedTransactionContextWrapper and executed once the
  * CreateTransaction completes, successfully or not.
  */
 final class RemoteTransactionContextSupport {
@@ -59,9 +59,9 @@ final class RemoteTransactionContextSupport {
 
     private final Timeout createTxMessageTimeout;
 
-    private final TransactionContextWrapper transactionContextWrapper;
+    private final DelayedTransactionContextWrapper transactionContextWrapper;
 
-    RemoteTransactionContextSupport(final TransactionContextWrapper transactionContextWrapper,
+    RemoteTransactionContextSupport(final DelayedTransactionContextWrapper transactionContextWrapper,
             final TransactionProxy parent, final String shardName) {
         this.parent = requireNonNull(parent);
         this.shardName = shardName;
@@ -231,7 +231,6 @@ final class RemoteTransactionContextSupport {
 
             localTransactionContext = new NoOpTransactionContext(exception, getIdentifier());
         }
-
         transactionContextWrapper.executePriorTransactionOperations(localTransactionContext);
     }
 
index cafc2a82eabcf01b18bd5bc1074fa2e412facf7c..c96233ccebbff6f04f4b59b18e771ea75aabac43 100644 (file)
@@ -36,7 +36,7 @@ interface TransactionContext {
     Future<Object> directCommit(Boolean havePermit);
 
     /**
-     * Invoked by {@link TransactionContextWrapper} when it has finished handing
+     * Invoked by {@link AbstractTransactionContextWrapper} when it has finished handing
      * off operations to this context. From this point on, the context is responsible
      * for throttling operations.
      *
index ce0461ffaf45b98c2db34fd28a696c02209b39a8..16a979fa6a1176b17f1d449d23de04de6bc58f1f 100644 (file)
@@ -69,7 +69,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
     private static final DeleteOperation ROOT_DELETE_OPERATION = new DeleteOperation(YangInstanceIdentifier.empty());
 
-    private final Map<String, TransactionContextWrapper> txContextWrappers = new TreeMap<>();
+    private final Map<String, AbstractTransactionContextWrapper> txContextWrappers = new TreeMap<>();
     private final AbstractTransactionContextFactory<?> txContextFactory;
     private final TransactionType type;
     private TransactionState state = TransactionState.OPEN;
@@ -95,7 +95,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         LOG.trace("Tx {} {} {}", getIdentifier(), readCmd.getClass().getSimpleName(), readCmd.getPath());
 
         final SettableFuture<T> proxyFuture = SettableFuture.create();
-        TransactionContextWrapper contextWrapper = getContextWrapper(shardName);
+        AbstractTransactionContextWrapper contextWrapper = getContextWrapper(shardName);
         contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
             @Override
             public void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
@@ -253,7 +253,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
             return;
         }
 
-        for (TransactionContextWrapper contextWrapper : txContextWrappers.values()) {
+        for (AbstractTransactionContextWrapper contextWrapper : txContextWrappers.values()) {
             contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
                 @Override
                 public void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
@@ -281,7 +281,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
                 ret = NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
                 break;
             case 1:
-                final Entry<String, TransactionContextWrapper> e = Iterables.getOnlyElement(
+                final Entry<String, AbstractTransactionContextWrapper> e = Iterables.getOnlyElement(
                         txContextWrappers.entrySet());
                 ret = createSingleCommitCohort(e.getKey(), e.getValue());
                 break;
@@ -297,7 +297,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
     private AbstractThreePhaseCommitCohort<?> createSingleCommitCohort(final String shardName,
-            final TransactionContextWrapper contextWrapper) {
+            final AbstractTransactionContextWrapper contextWrapper) {
 
         LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), shardName);
 
@@ -338,10 +338,10 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
         final List<ThreePhaseCommitCohortProxy.CohortInfo> cohorts = new ArrayList<>(txContextWrappers.size());
         final Optional<SortedSet<String>> shardNames = Optional.of(new TreeSet<>(txContextWrappers.keySet()));
-        for (Entry<String, TransactionContextWrapper> e : txContextWrappers.entrySet()) {
+        for (Entry<String, AbstractTransactionContextWrapper> e : txContextWrappers.entrySet()) {
             LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey());
 
-            final TransactionContextWrapper wrapper = e.getValue();
+            final AbstractTransactionContextWrapper wrapper = e.getValue();
 
             // The remote tx version is obtained the via TransactionContext which may not be available yet so
             // we pass a Supplier to dynamically obtain it. Once the ready Future is resolved the
@@ -361,17 +361,17 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         return getActorUtils().getShardStrategyFactory().getStrategy(path).findShard(path);
     }
 
-    private TransactionContextWrapper getContextWrapper(final YangInstanceIdentifier path) {
+    private AbstractTransactionContextWrapper getContextWrapper(final YangInstanceIdentifier path) {
         return getContextWrapper(shardNameFromIdentifier(path));
     }
 
-    private TransactionContextWrapper getContextWrapper(final String shardName) {
-        final TransactionContextWrapper existing = txContextWrappers.get(shardName);
+    private AbstractTransactionContextWrapper getContextWrapper(final String shardName) {
+        final AbstractTransactionContextWrapper existing = txContextWrappers.get(shardName);
         if (existing != null) {
             return existing;
         }
 
-        final TransactionContextWrapper fresh = txContextFactory.newTransactionContextWrapper(this, shardName);
+        final AbstractTransactionContextWrapper fresh = txContextFactory.newTransactionContextWrapper(this, shardName);
         txContextWrappers.put(shardName, fresh);
         return fresh;
     }
@@ -19,20 +19,20 @@ import org.mockito.junit.MockitoJUnitRunner;
 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
 
 @RunWith(MockitoJUnitRunner.StrictStubs.class)
-public class TransactionContextWrapperTest {
+public class DelayedTransactionContextWrapperTest {
     @Mock
     private ActorUtils actorUtils;
 
     @Mock
     private TransactionContext transactionContext;
 
-    private TransactionContextWrapper transactionContextWrapper;
+    private DelayedTransactionContextWrapper transactionContextWrapper;
 
     @Before
     public void setUp() {
         doReturn(DatastoreContext.newBuilder().build()).when(actorUtils).getDatastoreContext();
-        transactionContextWrapper = new TransactionContextWrapper(MockIdentifiers.transactionIdentifier(
-            TransactionContextWrapperTest.class, "mock"), actorUtils, "mock");
+        transactionContextWrapper = new DelayedTransactionContextWrapper(MockIdentifiers.transactionIdentifier(
+            DelayedTransactionContextWrapperTest.class, "mock"), actorUtils, "mock");
     }
 
     @Test
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DirectTransactionContextWrapperTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DirectTransactionContextWrapperTest.java
new file mode 100644 (file)
index 0000000..44f246f
--- /dev/null
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class DirectTransactionContextWrapperTest {
+    @Mock
+    private ActorUtils actorUtils;
+
+    @Mock
+    private TransactionContext transactionContext;
+
+    @Mock
+    private TransactionOperation transactionOperation;
+
+    private DirectTransactionContextWrapper contextWrapper;
+
+    @Before
+    public void setUp() {
+        doReturn(DatastoreContext.newBuilder().build()).when(actorUtils).getDatastoreContext();
+        contextWrapper = new DirectTransactionContextWrapper(MockIdentifiers.transactionIdentifier(
+                DirectTransactionContextWrapperTest.class, "mock"), actorUtils, "mock",
+                transactionContext);
+    }
+
+    @Test
+    public void testMaybeExecuteTransactionOperation() {
+        contextWrapper.maybeExecuteTransactionOperation(transactionOperation);
+        verify(transactionOperation, times(1)).invoke(transactionContext, null);
+    }
+}