@Override
@SuppressWarnings("checkstyle:IllegalCatch")
- public void executeModification(final AbstractModification modification) {
+ public void executeModification(final AbstractModification modification, final Boolean havePermit) {
incrementModificationCount();
if (operationError == null) {
try {
}
@Override
- public <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> proxyFuture) {
+ public <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> proxyFuture,
+ final Boolean havePermit) {
Futures.addCallback(readCmd.apply(getReadDelegate()), new FutureCallback<T>() {
@Override
public void onSuccess(final T result) {
}
@Override
- public Future<ActorSelection> readyTransaction() {
+ public Future<ActorSelection> readyTransaction(final Boolean havePermit) {
final LocalThreePhaseCommitCohort cohort = ready();
return cohort.initiateCoordinatedCommit();
}
@Override
- public Future<Object> directCommit() {
+ public Future<Object> directCommit(final Boolean havePermit) {
final LocalThreePhaseCommitCohort cohort = ready();
return cohort.initiateDirectCommit();
}
private final Throwable failure;
- NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier) {
+ NoOpTransactionContext(final Throwable failure, final TransactionIdentifier identifier) {
super(identifier);
this.failure = failure;
}
}
@Override
- public Future<Object> directCommit() {
+ public Future<Object> directCommit(final Boolean havePermit) {
LOG.debug("Tx {} directCommit called, failure: {}", getIdentifier(), failure);
return akka.dispatch.Futures.failed(failure);
}
@Override
- public Future<ActorSelection> readyTransaction() {
+ public Future<ActorSelection> readyTransaction(final Boolean havePermit) {
LOG.debug("Tx {} readyTransaction called, failure: {}", getIdentifier(), failure);
return akka.dispatch.Futures.failed(failure);
}
@Override
- public void executeModification(AbstractModification modification) {
+ public void executeModification(final AbstractModification modification, final Boolean havePermit) {
LOG.debug("Tx {} executeModification {} called path = {}", getIdentifier(),
modification.getClass().getSimpleName(), modification.getPath());
}
@Override
- public <T> void executeRead(AbstractRead<T> readCmd, SettableFuture<T> proxyFuture) {
+ public <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> proxyFuture,
+ final Boolean havePermit) {
LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(),
readCmd.getPath());
*/
private volatile Throwable failedModification;
- protected RemoteTransactionContext(TransactionIdentifier identifier, ActorSelection actor,
- ActorContext actorContext, short remoteTransactionVersion, OperationLimiter limiter) {
+ protected RemoteTransactionContext(final TransactionIdentifier identifier, final ActorSelection actor,
+ final ActorContext actorContext, final short remoteTransactionVersion, final OperationLimiter limiter) {
super(identifier, remoteTransactionVersion);
this.limiter = Preconditions.checkNotNull(limiter);
this.actor = actor;
}
@Override
- public Future<Object> directCommit() {
+ public Future<Object> directCommit(final Boolean havePermit) {
LOG.debug("Tx {} directCommit called", getIdentifier());
// Send the remaining batched modifications, if any, with the ready flag set.
-
+ bumpPermits(havePermit);
return sendBatchedModifications(true, true);
}
@Override
- public Future<ActorSelection> readyTransaction() {
+ public Future<ActorSelection> readyTransaction(final Boolean havePermit) {
logModificationCount();
LOG.debug("Tx {} readyTransaction called", getIdentifier());
// Send the remaining batched modifications, if any, with the ready flag set.
+ bumpPermits(havePermit);
Future<Object> lastModificationsFuture = sendBatchedModifications(true, false);
return transformReadyReply(lastModificationsFuture);
}
+ private void bumpPermits(final Boolean havePermit) {
+ if (Boolean.TRUE.equals(havePermit)) {
+ ++batchPermits;
+ }
+ }
+
protected Future<ActorSelection> transformReadyReply(final Future<Object> readyReplyFuture) {
// Transform the last reply Future into a Future that returns the cohort actor path from
// the last reply message. That's the end result of the ready operation.
return new BatchedModifications(getIdentifier(), getTransactionVersion());
}
- private void batchModification(Modification modification, boolean havePermit) {
+ private void batchModification(final Modification modification, final boolean havePermit) {
incrementModificationCount();
if (havePermit) {
++batchPermits;
return sendBatchedModifications(false, false);
}
- protected Future<Object> sendBatchedModifications(boolean ready, boolean doCommitOnReady) {
+ protected Future<Object> sendBatchedModifications(final boolean ready, final boolean doCommitOnReady) {
Future<Object> sent = null;
if (ready || batchedModifications != null && !batchedModifications.getModifications().isEmpty()) {
if (batchedModifications == null) {
actorContext.getTransactionCommitOperationTimeout());
sent.onComplete(new OnComplete<Object>() {
@Override
- public void onComplete(Throwable failure, Object success) {
+ public void onComplete(final Throwable failure, final Object success) {
if (failure != null) {
LOG.debug("Tx {} modifications failed", getIdentifier(), failure);
failedModification = failure;
}
@Override
- public void executeModification(AbstractModification modification) {
+ public void executeModification(final AbstractModification modification, final Boolean havePermit) {
LOG.debug("Tx {} executeModification {} called path = {}", getIdentifier(),
modification.getClass().getSimpleName(), modification.getPath());
- final boolean havePermit = failedModification == null && acquireOperation();
- batchModification(modification, havePermit);
+ final boolean permitToRelease;
+ if (havePermit == null) {
+ permitToRelease = failedModification == null && acquireOperation();
+ } else {
+ permitToRelease = havePermit.booleanValue();
+ }
+
+ batchModification(modification, permitToRelease);
}
@Override
- public <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> returnFuture) {
+ public <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> returnFuture,
+ final Boolean havePermit) {
LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(),
readCmd.getPath());
// Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
// public API contract.
- final boolean havePermit = acquireOperation();
+ final boolean permitToRelease = havePermit == null ? acquireOperation() : havePermit.booleanValue();
sendBatchedModifications();
OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
- public void onComplete(Throwable failure, Object response) {
+ public void onComplete(final Throwable failure, final Object response) {
// We have previously acquired an operation, now release it, no matter what happened
- if (havePermit) {
+ if (permitToRelease) {
limiter.release();
}
* @return True if a permit was successfully acquired, false otherwise
*/
private boolean acquireOperation() {
- if (isOperationHandOffComplete()) {
- if (limiter.acquire()) {
- return true;
- }
+ Preconditions.checkState(isOperationHandOffComplete(),
+ "Attempted to acquire execute operation permit for transaction %s on actor %s during handoff",
+ getIdentifier(), actor);
- LOG.warn("Failed to acquire execute operation permit for transaction {} on actor {}", getIdentifier(),
- actor);
+ if (limiter.acquire()) {
+ return true;
}
+ LOG.warn("Failed to acquire execute operation permit for transaction {} on actor {}", getIdentifier(), actor);
return false;
}
interface TransactionContext {
void closeTransaction();
- Future<ActorSelection> readyTransaction();
+ Future<ActorSelection> readyTransaction(Boolean havePermit);
- void executeModification(AbstractModification modification);
+ void executeModification(AbstractModification modification, Boolean havePermit);
- <T> void executeRead(AbstractRead<T> readCmd, SettableFuture<T> promise);
+ <T> void executeRead(AbstractRead<T> readCmd, SettableFuture<T> promise, Boolean havePermit);
- Future<Object> directCommit();
+ Future<Object> directCommit(Boolean havePermit);
/**
* Invoked by {@link TransactionContextWrapper} when it has finished handing
private final TransactionContext cleanup;
- private TransactionContextCleanup(TransactionProxy referent, TransactionContext cleanup) {
+ private TransactionContextCleanup(final TransactionProxy referent, final TransactionContext cleanup) {
super(referent, QUEUE);
this.cleanup = cleanup;
}
import akka.actor.ActorSelection;
import akka.dispatch.Futures;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
* The list of transaction operations to execute once the TransactionContext becomes available.
*/
@GuardedBy("queuedTxOperations")
- private final List<TransactionOperation> queuedTxOperations = Lists.newArrayList();
+ private final List<Entry<TransactionOperation, Boolean>> queuedTxOperations = new ArrayList<>();
private final TransactionIdentifier identifier;
+ private final OperationLimiter limiter;
private final String shardName;
/**
* The resulting TransactionContext.
*/
private volatile TransactionContext transactionContext;
-
- private final OperationLimiter limiter;
+ @GuardedBy("queuedTxOperations")
+ private TransactionContext deferredTransactionContext;
+ @GuardedBy("queuedTxOperations")
+ private boolean pendingEnqueue;
TransactionContextWrapper(final TransactionIdentifier identifier, final ActorContext actorContext,
final String shardName) {
}
/**
- * Adds a TransactionOperation to be executed once the TransactionContext becomes available.
+ * Adds a TransactionOperation to be executed once the TransactionContext becomes available. This method is called
+ * only after the caller has checked (without synchronizing with executePriorTransactionOperations()) that the
+ * context is not available.
*/
private void enqueueTransactionOperation(final TransactionOperation operation) {
- final boolean invokeOperation;
+ // We have three things to do here:
+ // - synchronize with executePriorTransactionOperations() so that logical operation ordering is maintained
+ // - acquire a permit for the operation if we still need to enqueue it
+ // - enqueue the operation
+ //
+ // Since each operation needs to acquire a permit exactly once and the limiter is shared between us and the
+ // TransactionContext, we need to know whether an operation has a permit before we enqueue it. Further
+ // complications are:
+ // - this method may be called from the thread invoking executePriorTransactionOperations()
+ // - user may be violating API contract of using the transaction from a single thread
+
+ // As a first step, we will synchronize on the queue and check if the handoff has completed. While we have
+ // the lock, we will assert that we will be enqueing another operation.
+ final TransactionContext contextOnEntry;
synchronized (queuedTxOperations) {
- if (transactionContext == null) {
- LOG.debug("Tx {} Queuing TransactionOperation", identifier);
-
- queuedTxOperations.add(operation);
- invokeOperation = false;
- } else {
- invokeOperation = true;
+ contextOnEntry = transactionContext;
+ if (contextOnEntry == null) {
+ Preconditions.checkState(pendingEnqueue == false, "Concurrent access to transaction %s detected",
+ identifier);
+ pendingEnqueue = true;
}
}
- if (invokeOperation) {
- operation.invoke(transactionContext);
- } else {
- if (!limiter.acquire()) {
+ // Short-circuit if there is a context
+ if (contextOnEntry != null) {
+ operation.invoke(transactionContext, null);
+ return;
+ }
+
+ boolean cleanupEnqueue = true;
+ TransactionContext finishHandoff = null;
+ try {
+ // Acquire the permit,
+ final boolean havePermit = limiter.acquire();
+ if (!havePermit) {
LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", identifier,
shardName);
}
+
+ // Ready to enqueue, take the lock again and append the operation
+ synchronized (queuedTxOperations) {
+ LOG.debug("Tx {} Queuing TransactionOperation", identifier);
+ queuedTxOperations.add(new SimpleImmutableEntry<>(operation, havePermit));
+ pendingEnqueue = false;
+ cleanupEnqueue = false;
+ finishHandoff = deferredTransactionContext;
+ deferredTransactionContext = null;
+ }
+ } finally {
+ if (cleanupEnqueue) {
+ synchronized (queuedTxOperations) {
+ pendingEnqueue = false;
+ finishHandoff = deferredTransactionContext;
+ deferredTransactionContext = null;
+ }
+ }
+ if (finishHandoff != null) {
+ executePriorTransactionOperations(finishHandoff);
+ }
}
}
void maybeExecuteTransactionOperation(final TransactionOperation op) {
-
- if (transactionContext != null) {
- op.invoke(transactionContext);
+ final TransactionContext localContext = transactionContext;
+ if (localContext != null) {
+ op.invoke(localContext, null);
} else {
// The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
// callback to be executed after the Tx is created.
// in case a TransactionOperation results in another transaction operation being
// queued (eg a put operation from a client read Future callback that is notified
// synchronously).
- final Collection<TransactionOperation> operationsBatch;
+ final Collection<Entry<TransactionOperation, Boolean>> operationsBatch;
synchronized (queuedTxOperations) {
if (queuedTxOperations.isEmpty()) {
- // We're done invoking the TransactionOperations so we can now publish the
- // TransactionContext.
- localTransactionContext.operationHandOffComplete();
- if (!localTransactionContext.usesOperationLimiting()) {
- limiter.releaseAll();
+ if (!pendingEnqueue) {
+ // We're done invoking the TransactionOperations so we can now publish the TransactionContext.
+ localTransactionContext.operationHandOffComplete();
+ if (!localTransactionContext.usesOperationLimiting()) {
+ limiter.releaseAll();
+ }
+
+ // This is null-to-non-null transition after which we are releasing the lock and not doing
+ // any further processing.
+ transactionContext = localTransactionContext;
+ } else {
+ deferredTransactionContext = localTransactionContext;
}
- transactionContext = localTransactionContext;
- break;
+ return;
}
operationsBatch = new ArrayList<>(queuedTxOperations);
// Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
// A slight down-side is that we need to re-acquire the lock below but this should
// be negligible.
- for (TransactionOperation oper : operationsBatch) {
- oper.invoke(localTransactionContext);
+ for (Entry<TransactionOperation, Boolean> oper : operationsBatch) {
+ oper.getKey().invoke(localTransactionContext, oper.getValue());
}
}
}
Future<ActorSelection> readyTransaction() {
// avoid the creation of a promise and a TransactionOperation
- if (transactionContext != null) {
- return transactionContext.readyTransaction();
+ final TransactionContext localContext = transactionContext;
+ if (localContext != null) {
+ return localContext.readyTransaction(null);
}
final Promise<ActorSelection> promise = Futures.promise();
enqueueTransactionOperation(new TransactionOperation() {
@Override
- public void invoke(TransactionContext newTransactionContext) {
- promise.completeWith(newTransactionContext.readyTransaction());
+ public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) {
+ promise.completeWith(newTransactionContext.readyTransaction(havePermit));
}
});
return promise.future();
}
- public OperationLimiter getLimiter() {
+ OperationLimiter getLimiter() {
return limiter;
}
-
-
}
*/
package org.opendaylight.controller.cluster.datastore;
+import org.eclipse.jdt.annotation.Nullable;
+
/**
* Abstract superclass for transaction operations which should be executed
* on a {@link TransactionContext} at a later point in time.
* Execute the delayed operation.
*
* @param transactionContext the TransactionContext
+ * @param havePermit Boolean indicator if this operation has tried and acquired a permit, null if there was no
+ * attempt to acquire a permit.
*/
- protected abstract void invoke(TransactionContext transactionContext);
+ protected abstract void invoke(TransactionContext transactionContext, @Nullable Boolean havePermit);
}
TransactionContextWrapper contextWrapper = getContextWrapper(shardName);
contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
@Override
- public void invoke(final TransactionContext transactionContext) {
- transactionContext.executeRead(readCmd, proxyFuture);
+ public void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
+ transactionContext.executeRead(readCmd, proxyFuture, havePermit);
}
});
TransactionContextWrapper contextWrapper = getContextWrapper(modification.getPath());
contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
@Override
- protected void invoke(final TransactionContext transactionContext) {
- transactionContext.executeModification(modification);
+ protected void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
+ transactionContext.executeModification(modification, havePermit);
}
});
}
for (TransactionContextWrapper contextWrapper : txContextWrappers.values()) {
contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
@Override
- public void invoke(final TransactionContext transactionContext) {
+ public void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
transactionContext.closeTransaction();
}
});
final Promise promise = akka.dispatch.Futures.promise();
contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
@Override
- public void invoke(final TransactionContext newTransactionContext) {
- promise.completeWith(getDirectCommitFuture(newTransactionContext, operationCallbackRef));
+ public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) {
+ promise.completeWith(getDirectCommitFuture(newTransactionContext, operationCallbackRef,
+ havePermit));
}
});
future = promise.future();
} else {
// avoid the creation of a promise and a TransactionOperation
- future = getDirectCommitFuture(transactionContext, operationCallbackRef);
+ future = getDirectCommitFuture(transactionContext, operationCallbackRef, null);
}
return new SingleCommitCohortProxy(txContextFactory.getActorContext(), future, getIdentifier(),
}
private Future<?> getDirectCommitFuture(final TransactionContext transactionContext,
- final OperationCallback.Reference operationCallbackRef) {
+ final OperationCallback.Reference operationCallbackRef, final Boolean havePermit) {
TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback(
txContextFactory.getActorContext());
operationCallbackRef.set(rateLimitingCallback);
rateLimitingCallback.run();
- return transactionContext.directCommit();
+ return transactionContext.directCommit(havePermit);
}
private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort(
public void testWrite() {
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.EMPTY;
NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
- localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode));
+ localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode),
+ null);
verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
}
public void testMerge() {
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.EMPTY;
NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
- localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode));
+ localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode),
+ null);
verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
}
@Test
public void testDelete() {
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.EMPTY;
- localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier));
+ localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier), null);
verify(readWriteTransaction).delete(yangInstanceIdentifier);
}
doReturn(Futures.immediateCheckedFuture(Optional.of(normalizedNode))).when(readWriteTransaction)
.read(yangInstanceIdentifier);
localTransactionContext.executeRead(new ReadData(yangInstanceIdentifier, DataStoreVersions.CURRENT_VERSION),
- SettableFuture.<Optional<NormalizedNode<?,?>>>create());
+ SettableFuture.create(), null);
verify(readWriteTransaction).read(yangInstanceIdentifier);
}
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.EMPTY;
doReturn(Futures.immediateCheckedFuture(true)).when(readWriteTransaction).exists(yangInstanceIdentifier);
localTransactionContext.executeRead(new DataExists(yangInstanceIdentifier, DataStoreVersions.CURRENT_VERSION),
- SettableFuture.<Boolean>create());
+ SettableFuture.create(), null);
verify(readWriteTransaction).exists(yangInstanceIdentifier);
}
doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit();
doReturn(mockCohort).when(mockReadySupport).onTransactionReady(readWriteTransaction, null);
- Future<ActorSelection> future = localTransactionContext.readyTransaction();
+ Future<ActorSelection> future = localTransactionContext.readyTransaction(null);
assertTrue(future.isCompleted());
verify(mockReadySupport).onTransactionReady(readWriteTransaction, null);
RuntimeException error = new RuntimeException("mock");
doThrow(error).when(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
- localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode));
- localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode));
+ localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode),
+ null);
+ localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode),
+ null);
verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
RuntimeException error = new RuntimeException("mock");
doThrow(error).when(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
- localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode));
- localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode));
+ localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode),
+ null);
+ localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode),
+ null);
verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
RuntimeException error = new RuntimeException("mock");
doThrow(error).when(readWriteTransaction).delete(yangInstanceIdentifier);
- localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier));
- localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier));
+ localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier), null);
+ localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier), null);
verify(readWriteTransaction).delete(yangInstanceIdentifier);
doReadyWithExpectedError(error);
}
- private void doReadyWithExpectedError(RuntimeException expError) {
+ private void doReadyWithExpectedError(final RuntimeException expError) {
LocalThreePhaseCommitCohort mockCohort = mock(LocalThreePhaseCommitCohort.class);
doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit();
doReturn(mockCohort).when(mockReadySupport).onTransactionReady(readWriteTransaction, expError);
- localTransactionContext.readyTransaction();
+ localTransactionContext.readyTransaction(null);
}
}
*/
@Test
public void testLimiterOnFailure() throws TimeoutException, InterruptedException {
- txContext.executeModification(DELETE);
- txContext.executeModification(DELETE);
+ txContext.executeModification(DELETE, null);
+ txContext.executeModification(DELETE, null);
assertEquals(2, limiter.availablePermits());
Future<Object> future = txContext.sendBatchedModifications();
assertEquals(4, limiter.availablePermits());
// The transaction has failed, no throttling should occur
- txContext.executeModification(DELETE);
+ txContext.executeModification(DELETE, null);
assertEquals(4, limiter.availablePermits());
// Executing a read should result in immediate failure
final SettableFuture<Boolean> readFuture = SettableFuture.create();
- txContext.executeRead(new DataExists(), readFuture);
+ txContext.executeRead(new DataExists(), readFuture, null);
assertTrue(readFuture.isDone());
try {
readFuture.get();
}
});
- future = txContext.directCommit();
+ future = txContext.directCommit(null);
msg = kit.expectMsgClass(BatchedModifications.class);
// Modification should have been thrown away by the dropped transmit induced by executeRead()
*/
@Test
public void testLimiterOnOverflowFailure() throws TimeoutException, InterruptedException {
- txContext.executeModification(DELETE);
- txContext.executeModification(DELETE);
- txContext.executeModification(DELETE);
- txContext.executeModification(DELETE);
+ txContext.executeModification(DELETE, null);
+ txContext.executeModification(DELETE, null);
+ txContext.executeModification(DELETE, null);
+ txContext.executeModification(DELETE, null);
assertEquals(0, limiter.availablePermits());
- txContext.executeModification(DELETE);
+ txContext.executeModification(DELETE, null);
// Last acquire should have failed ...
assertEquals(0, limiter.availablePermits());