import static java.util.Objects.requireNonNull;
import akka.actor.ActorSelection;
-import akka.dispatch.OnComplete;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
return null;
}
- private void onFindPrimaryShardSuccess(PrimaryShardInfo primaryShardInfo, TransactionProxy parent,
- String shardName, TransactionContextWrapper transactionContextWrapper) {
+ private AbstractTransactionContextWrapper maybeCreateDirectTransactionContextWrapper(
+ final PrimaryShardInfo primaryShardInfo, final TransactionProxy parent,
+ final String shardName, final DelayedTransactionContextWrapper transactionContextWrapper) {
+ LOG.debug("Tx {}: Found primary {} for shard {}, trying to use DirectTransactionContextWrapper",
+ parent.getIdentifier(), primaryShardInfo.getPrimaryShardActor(), shardName);
+
+ updateShardInfo(shardName, primaryShardInfo);
+
+ final TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
+ try {
+ if (localContext != null) {
+ LOG.debug("Tx {}: Local transaction context created successfully, using DirectTransactionWrapper",
+ parent.getIdentifier());
+ return new DirectTransactionContextWrapper(parent.getIdentifier(), actorUtils, shardName,
+ localContext);
+ } else {
+ LOG.debug("Tx {}: Local transaction context creation failed, using DelayedTransactionWrapper",
+ parent.getIdentifier());
+ final RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(
+ transactionContextWrapper, parent, shardName);
+ remote.setPrimaryShard(primaryShardInfo);
+ return transactionContextWrapper;
+ }
+ } finally {
+ onTransactionContextCreated(parent.getIdentifier());
+ }
+ }
+
+ private void onFindPrimaryShardSuccess(final PrimaryShardInfo primaryShardInfo, final TransactionProxy parent,
+ final String shardName, final DelayedTransactionContextWrapper transactionContextWrapper) {
LOG.debug("Tx {}: Found primary {} for shard {}", parent.getIdentifier(),
primaryShardInfo.getPrimaryShardActor(), shardName);
updateShardInfo(shardName, primaryShardInfo);
+ final TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
try {
- TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
if (localContext != null) {
transactionContextWrapper.executePriorTransactionOperations(localContext);
} else {
- RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(transactionContextWrapper,
- parent, shardName);
+ final RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(
+ transactionContextWrapper, parent, shardName);
remote.setPrimaryShard(primaryShardInfo);
}
} finally {
}
}
- private void onFindPrimaryShardFailure(Throwable failure, TransactionProxy parent,
- String shardName, TransactionContextWrapper transactionContextWrapper) {
+ private void onFindPrimaryShardFailure(final Throwable failure, final TransactionProxy parent,
+ final String shardName, final DelayedTransactionContextWrapper transactionContextWrapper) {
LOG.debug("Tx {}: Find primary for shard {} failed", parent.getIdentifier(), shardName, failure);
try {
- transactionContextWrapper.executePriorTransactionOperations(new NoOpTransactionContext(failure,
- parent.getIdentifier()));
+ transactionContextWrapper.executePriorTransactionOperations(
+ new NoOpTransactionContext(failure, parent.getIdentifier()));
} finally {
onTransactionContextCreated(parent.getIdentifier());
}
}
- final TransactionContextWrapper newTransactionContextWrapper(final TransactionProxy parent,
+ final AbstractTransactionContextWrapper newTransactionContextWrapper(final TransactionProxy parent,
final String shardName) {
- final TransactionContextWrapper transactionContextWrapper =
- new TransactionContextWrapper(parent.getIdentifier(), actorUtils, shardName);
-
- Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName, parent.getIdentifier());
+ final DelayedTransactionContextWrapper contextWrapper = new DelayedTransactionContextWrapper(
+ parent.getIdentifier(), actorUtils, shardName);
+ final Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName, parent.getIdentifier());
if (findPrimaryFuture.isCompleted()) {
- Try<PrimaryShardInfo> maybe = findPrimaryFuture.value().get();
+ final Try<PrimaryShardInfo> maybe = findPrimaryFuture.value().get();
if (maybe.isSuccess()) {
- onFindPrimaryShardSuccess(maybe.get(), parent, shardName, transactionContextWrapper);
+ return maybeCreateDirectTransactionContextWrapper(maybe.get(), parent, shardName,
+ contextWrapper);
} else {
- onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName, transactionContextWrapper);
+ onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName,
+ contextWrapper);
}
} else {
- findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
- @Override
- public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
- if (failure == null) {
- onFindPrimaryShardSuccess(primaryShardInfo, parent, shardName, transactionContextWrapper);
- } else {
- onFindPrimaryShardFailure(failure, parent, shardName, transactionContextWrapper);
- }
+ findPrimaryFuture.onComplete((result) -> {
+ if (result.isSuccess()) {
+ onFindPrimaryShardSuccess(result.get(), parent, shardName, contextWrapper);
+ } else {
+ onFindPrimaryShardFailure(result.failed().get(), parent, shardName, contextWrapper);
}
+ return null;
}, actorUtils.getClientDispatcher());
}
-
- return transactionContextWrapper;
+ return contextWrapper;
}
private void updateShardInfo(final String shardName, final PrimaryShardInfo primaryShardInfo) {
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.ActorSelection;
+import java.util.Optional;
+import java.util.SortedSet;
+import java.util.concurrent.TimeUnit;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
+import scala.concurrent.Future;
+
+/**
+ * A helper class that wraps an eventual TransactionContext instance. We have two specializations:
+ * <ul>
+ * <li>{@link DelayedTransactionContextWrapper}, which enqueues operations towards the backend</li>
+ * <li>{@link DirectTransactionContextWrapper}, which sends operations to the backend</li>
+ * </ul>
+ */
+abstract class AbstractTransactionContextWrapper {
+ private final TransactionIdentifier identifier;
+ private final OperationLimiter limiter;
+ private final String shardName;
+
+ AbstractTransactionContextWrapper(@NonNull final TransactionIdentifier identifier,
+ @NonNull final ActorUtils actorUtils, @NonNull final String shardName) {
+ this.identifier = requireNonNull(identifier);
+ this.shardName = requireNonNull(shardName);
+ limiter = new OperationLimiter(identifier,
+ // 1 extra permit for the ready operation
+ actorUtils.getDatastoreContext().getShardBatchedModificationCount() + 1,
+ TimeUnit.MILLISECONDS.toSeconds(actorUtils.getDatastoreContext().getOperationTimeoutInMillis()));
+ }
+
+ final TransactionIdentifier getIdentifier() {
+ return identifier;
+ }
+
+ final OperationLimiter getLimiter() {
+ return limiter;
+ }
+
+ final String getShardName() {
+ return shardName;
+ }
+
+ abstract @Nullable TransactionContext getTransactionContext();
+
+ /**
+ * Either enqueue or execute specified operation.
+ *
+ * @param op Operation to (eventually) execute
+ */
+ abstract void maybeExecuteTransactionOperation(TransactionOperation op);
+
+ /**
+ * Mark the transaction as ready.
+ *
+ * @param participatingShardNames Shards which participate on the transaction
+ * @return Future indicating the transaction has been readied on the backend
+ */
+ abstract @NonNull Future<ActorSelection> readyTransaction(Optional<SortedSet<String>> participatingShardNames);
+}
package org.opendaylight.controller.cluster.datastore;
import static com.google.common.base.Preconditions.checkState;
-import static java.util.Objects.requireNonNull;
import akka.actor.ActorSelection;
import akka.dispatch.Futures;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.SortedSet;
-import java.util.concurrent.TimeUnit;
import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import org.slf4j.Logger;
import scala.concurrent.Promise;
/**
- * A helper class that wraps an eventual TransactionContext instance. Operations destined for the target
- * TransactionContext instance are cached until the TransactionContext instance becomes available at which
- * time they are executed.
+ * Delayed implementation of TransactionContextWrapper. Operations destined for the target
+ * TransactionContext instance are cached until the TransactionContext instance becomes
+ * available at which time they are executed.
*
* @author Thomas Pantelis
*/
-class TransactionContextWrapper {
- private static final Logger LOG = LoggerFactory.getLogger(TransactionContextWrapper.class);
+final class DelayedTransactionContextWrapper extends AbstractTransactionContextWrapper {
+ private static final Logger LOG = LoggerFactory.getLogger(DelayedTransactionContextWrapper.class);
/**
* The list of transaction operations to execute once the TransactionContext becomes available.
*/
@GuardedBy("queuedTxOperations")
private final List<Entry<TransactionOperation, Boolean>> queuedTxOperations = new ArrayList<>();
- private final TransactionIdentifier identifier;
- private final OperationLimiter limiter;
- private final String shardName;
/**
* The resulting TransactionContext.
@GuardedBy("queuedTxOperations")
private boolean pendingEnqueue;
- TransactionContextWrapper(final TransactionIdentifier identifier, final ActorUtils actorUtils,
- final String shardName) {
- this.identifier = requireNonNull(identifier);
- this.limiter = new OperationLimiter(identifier,
- // 1 extra permit for the ready operation
- actorUtils.getDatastoreContext().getShardBatchedModificationCount() + 1,
- TimeUnit.MILLISECONDS.toSeconds(actorUtils.getDatastoreContext().getOperationTimeoutInMillis()));
- this.shardName = requireNonNull(shardName);
+ DelayedTransactionContextWrapper(@NonNull final TransactionIdentifier identifier,
+ @NonNull final ActorUtils actorUtils, @NonNull final String shardName) {
+ super(identifier, actorUtils, shardName);
}
+ @Override
TransactionContext getTransactionContext() {
return transactionContext;
}
- TransactionIdentifier getIdentifier() {
- return identifier;
+ @Override
+ void maybeExecuteTransactionOperation(final TransactionOperation op) {
+ final TransactionContext localContext = transactionContext;
+ if (localContext != null) {
+ op.invoke(localContext, null);
+ } else {
+ // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
+ // callback to be executed after the Tx is created.
+ enqueueTransactionOperation(op);
+ }
+ }
+
+ @Override
+ Future<ActorSelection> readyTransaction(final Optional<SortedSet<String>> participatingShardNames) {
+ // avoid the creation of a promise and a TransactionOperation
+ final TransactionContext localContext = transactionContext;
+ if (localContext != null) {
+ return localContext.readyTransaction(null, participatingShardNames);
+ }
+
+ final Promise<ActorSelection> promise = Futures.promise();
+ enqueueTransactionOperation(new TransactionOperation() {
+ @Override
+ public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) {
+ promise.completeWith(newTransactionContext.readyTransaction(havePermit, participatingShardNames));
+ }
+ });
+
+ return promise.future();
}
/**
synchronized (queuedTxOperations) {
contextOnEntry = transactionContext;
if (contextOnEntry == null) {
- checkState(pendingEnqueue == false, "Concurrent access to transaction %s detected", identifier);
+ checkState(pendingEnqueue == false, "Concurrent access to transaction %s detected", getIdentifier());
pendingEnqueue = true;
}
}
TransactionContext finishHandoff = null;
try {
// Acquire the permit,
- final boolean havePermit = limiter.acquire();
+ final boolean havePermit = getLimiter().acquire();
if (!havePermit) {
- LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", identifier,
- shardName);
+ LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", getIdentifier(),
+ getShardName());
}
// Ready to enqueue, take the lock again and append the operation
synchronized (queuedTxOperations) {
- LOG.debug("Tx {} Queuing TransactionOperation", identifier);
+ LOG.debug("Tx {} Queuing TransactionOperation", getIdentifier());
queuedTxOperations.add(new SimpleImmutableEntry<>(operation, havePermit));
pendingEnqueue = false;
cleanupEnqueue = false;
}
}
- void maybeExecuteTransactionOperation(final TransactionOperation op) {
- final TransactionContext localContext = transactionContext;
- if (localContext != null) {
- op.invoke(localContext, null);
- } else {
- // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
- // callback to be executed after the Tx is created.
- enqueueTransactionOperation(op);
- }
- }
-
void executePriorTransactionOperations(final TransactionContext localTransactionContext) {
while (true) {
// Access to queuedTxOperations and transactionContext must be protected and atomic
if (permit.booleanValue() && !localTransactionContext.usesOperationLimiting()) {
// If the context is not using limiting we need to release operations as we are queueing them, so
// user threads are not charged for them.
- limiter.release();
+ getLimiter().release();
}
oper.getKey().invoke(localTransactionContext, permit);
}
}
}
- Future<ActorSelection> readyTransaction(Optional<SortedSet<String>> participatingShardNames) {
- // avoid the creation of a promise and a TransactionOperation
- final TransactionContext localContext = transactionContext;
- if (localContext != null) {
- return localContext.readyTransaction(null, participatingShardNames);
- }
-
- final Promise<ActorSelection> promise = Futures.promise();
- enqueueTransactionOperation(new TransactionOperation() {
- @Override
- public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) {
- promise.completeWith(newTransactionContext.readyTransaction(havePermit, participatingShardNames));
- }
- });
-
- return promise.future();
- }
-
- OperationLimiter getLimiter() {
- return limiter;
- }
}
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static java.util.Objects.requireNonNull;
+
+import akka.actor.ActorSelection;
+import java.util.Optional;
+import java.util.SortedSet;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
+import scala.concurrent.Future;
+
+/**
+ * Direct implementation of TransactionContextWrapper. Operation are executed directly on TransactionContext. Always
+ * has completed context and executes on local shard.
+ */
+final class DirectTransactionContextWrapper extends AbstractTransactionContextWrapper {
+ private final TransactionContext transactionContext;
+
+ DirectTransactionContextWrapper(@NonNull final TransactionIdentifier identifier,
+ @NonNull final ActorUtils actorUtils,
+ @NonNull final String shardName,
+ @NonNull final TransactionContext transactionContext) {
+ super(identifier, actorUtils, shardName);
+ this.transactionContext = requireNonNull(transactionContext);
+ }
+
+ @Override
+ TransactionContext getTransactionContext() {
+ return transactionContext;
+ }
+
+ @Override
+ void maybeExecuteTransactionOperation(final TransactionOperation op) {
+ op.invoke(transactionContext, null);
+ }
+
+ @Override
+ Future<ActorSelection> readyTransaction(final Optional<SortedSet<String>> participatingShardNames) {
+ return transactionContext.readyTransaction(null, participatingShardNames);
+ }
+}
* <p/>
* The end result from a completed CreateTransaction message is a TransactionContext that is
* used to perform transaction operations. Transaction operations that occur before the
- * CreateTransaction completes are cache via a TransactionContextWrapper and executed once the
+ * CreateTransaction completes are cached via a DelayedTransactionContextWrapper and executed once the
* CreateTransaction completes, successfully or not.
*/
final class RemoteTransactionContextSupport {
private final Timeout createTxMessageTimeout;
- private final TransactionContextWrapper transactionContextWrapper;
+ private final DelayedTransactionContextWrapper transactionContextWrapper;
- RemoteTransactionContextSupport(final TransactionContextWrapper transactionContextWrapper,
+ RemoteTransactionContextSupport(final DelayedTransactionContextWrapper transactionContextWrapper,
final TransactionProxy parent, final String shardName) {
this.parent = requireNonNull(parent);
this.shardName = shardName;
localTransactionContext = new NoOpTransactionContext(exception, getIdentifier());
}
-
transactionContextWrapper.executePriorTransactionOperations(localTransactionContext);
}
Future<Object> directCommit(Boolean havePermit);
/**
- * Invoked by {@link TransactionContextWrapper} when it has finished handing
+ * Invoked by {@link AbstractTransactionContextWrapper} when it has finished handing
* off operations to this context. From this point on, the context is responsible
* for throttling operations.
*
private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
private static final DeleteOperation ROOT_DELETE_OPERATION = new DeleteOperation(YangInstanceIdentifier.empty());
- private final Map<String, TransactionContextWrapper> txContextWrappers = new TreeMap<>();
+ private final Map<String, AbstractTransactionContextWrapper> txContextWrappers = new TreeMap<>();
private final AbstractTransactionContextFactory<?> txContextFactory;
private final TransactionType type;
private TransactionState state = TransactionState.OPEN;
LOG.trace("Tx {} {} {}", getIdentifier(), readCmd.getClass().getSimpleName(), readCmd.getPath());
final SettableFuture<T> proxyFuture = SettableFuture.create();
- TransactionContextWrapper contextWrapper = getContextWrapper(shardName);
+ AbstractTransactionContextWrapper contextWrapper = getContextWrapper(shardName);
contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
@Override
public void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
return;
}
- for (TransactionContextWrapper contextWrapper : txContextWrappers.values()) {
+ for (AbstractTransactionContextWrapper contextWrapper : txContextWrappers.values()) {
contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
@Override
public void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
ret = NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
break;
case 1:
- final Entry<String, TransactionContextWrapper> e = Iterables.getOnlyElement(
+ final Entry<String, AbstractTransactionContextWrapper> e = Iterables.getOnlyElement(
txContextWrappers.entrySet());
ret = createSingleCommitCohort(e.getKey(), e.getValue());
break;
@SuppressWarnings({ "rawtypes", "unchecked" })
private AbstractThreePhaseCommitCohort<?> createSingleCommitCohort(final String shardName,
- final TransactionContextWrapper contextWrapper) {
+ final AbstractTransactionContextWrapper contextWrapper) {
LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), shardName);
final List<ThreePhaseCommitCohortProxy.CohortInfo> cohorts = new ArrayList<>(txContextWrappers.size());
final Optional<SortedSet<String>> shardNames = Optional.of(new TreeSet<>(txContextWrappers.keySet()));
- for (Entry<String, TransactionContextWrapper> e : txContextWrappers.entrySet()) {
+ for (Entry<String, AbstractTransactionContextWrapper> e : txContextWrappers.entrySet()) {
LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey());
- final TransactionContextWrapper wrapper = e.getValue();
+ final AbstractTransactionContextWrapper wrapper = e.getValue();
// The remote tx version is obtained the via TransactionContext which may not be available yet so
// we pass a Supplier to dynamically obtain it. Once the ready Future is resolved the
return getActorUtils().getShardStrategyFactory().getStrategy(path).findShard(path);
}
- private TransactionContextWrapper getContextWrapper(final YangInstanceIdentifier path) {
+ private AbstractTransactionContextWrapper getContextWrapper(final YangInstanceIdentifier path) {
return getContextWrapper(shardNameFromIdentifier(path));
}
- private TransactionContextWrapper getContextWrapper(final String shardName) {
- final TransactionContextWrapper existing = txContextWrappers.get(shardName);
+ private AbstractTransactionContextWrapper getContextWrapper(final String shardName) {
+ final AbstractTransactionContextWrapper existing = txContextWrappers.get(shardName);
if (existing != null) {
return existing;
}
- final TransactionContextWrapper fresh = txContextFactory.newTransactionContextWrapper(this, shardName);
+ final AbstractTransactionContextWrapper fresh = txContextFactory.newTransactionContextWrapper(this, shardName);
txContextWrappers.put(shardName, fresh);
return fresh;
}
import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
@RunWith(MockitoJUnitRunner.StrictStubs.class)
-public class TransactionContextWrapperTest {
+public class DelayedTransactionContextWrapperTest {
@Mock
private ActorUtils actorUtils;
@Mock
private TransactionContext transactionContext;
- private TransactionContextWrapper transactionContextWrapper;
+ private DelayedTransactionContextWrapper transactionContextWrapper;
@Before
public void setUp() {
doReturn(DatastoreContext.newBuilder().build()).when(actorUtils).getDatastoreContext();
- transactionContextWrapper = new TransactionContextWrapper(MockIdentifiers.transactionIdentifier(
- TransactionContextWrapperTest.class, "mock"), actorUtils, "mock");
+ transactionContextWrapper = new DelayedTransactionContextWrapper(MockIdentifiers.transactionIdentifier(
+ DelayedTransactionContextWrapperTest.class, "mock"), actorUtils, "mock");
}
@Test
--- /dev/null
+/*
+ * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class DirectTransactionContextWrapperTest {
+ @Mock
+ private ActorUtils actorUtils;
+
+ @Mock
+ private TransactionContext transactionContext;
+
+ @Mock
+ private TransactionOperation transactionOperation;
+
+ private DirectTransactionContextWrapper contextWrapper;
+
+ @Before
+ public void setUp() {
+ doReturn(DatastoreContext.newBuilder().build()).when(actorUtils).getDatastoreContext();
+ contextWrapper = new DirectTransactionContextWrapper(MockIdentifiers.transactionIdentifier(
+ DirectTransactionContextWrapperTest.class, "mock"), actorUtils, "mock",
+ transactionContext);
+ }
+
+ @Test
+ public void testMaybeExecuteTransactionOperation() {
+ contextWrapper.maybeExecuteTransactionOperation(transactionOperation);
+ verify(transactionOperation, times(1)).invoke(transactionContext, null);
+ }
+}