The fix for CONTROLLER-1814 wrecked the fragile OperationLimiter
accounting between TransactionContextWrapper and
RemoteTransactionContext. The problem is that during initial
connect time TransactionContextWrapper acquires limiter operations
and the state of the OperationLimiter is inherited by
RemoteTransactionContext. This though means that we need to know
which operation actually acquired a permit and which did not.
As it turns out, this needs to make TransactionContext methods take
an additional hint as to whether a limit attempt was made and what
was the result of it.
Furthermore the internal TransactionContextWrapper logic needs to
be changed from 'enqueue and wait' to 'wait and enqueue' strategy,
so when an operation is added to the queue and potentially picked
up by executePriorTransactionOperations() we already know whether
a permit was acquired or not.
JIRA: CONTROLLER-1823
Change-Id: I78d43a1abde8c6da6e3da2f56823bba130499133
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit
b69500a51978c3d3ef639345a1a97a58cc3f6bb8)
(cherry picked from commit
dc295d9be77748d7e695d003a02d299d493abc8d)
@Override
@SuppressWarnings("checkstyle:IllegalCatch")
@Override
@SuppressWarnings("checkstyle:IllegalCatch")
- public void executeModification(final AbstractModification modification) {
+ public void executeModification(final AbstractModification modification, final Boolean havePermit) {
incrementModificationCount();
if (operationError == null) {
try {
incrementModificationCount();
if (operationError == null) {
try {
- public <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> proxyFuture) {
+ public <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> proxyFuture,
+ final Boolean havePermit) {
Futures.addCallback(readCmd.apply(getReadDelegate()), new FutureCallback<T>() {
@Override
public void onSuccess(final T result) {
Futures.addCallback(readCmd.apply(getReadDelegate()), new FutureCallback<T>() {
@Override
public void onSuccess(final T result) {
- public Future<ActorSelection> readyTransaction() {
+ public Future<ActorSelection> readyTransaction(final Boolean havePermit) {
final LocalThreePhaseCommitCohort cohort = ready();
return cohort.initiateCoordinatedCommit();
}
@Override
final LocalThreePhaseCommitCohort cohort = ready();
return cohort.initiateCoordinatedCommit();
}
@Override
- public Future<Object> directCommit() {
+ public Future<Object> directCommit(final Boolean havePermit) {
final LocalThreePhaseCommitCohort cohort = ready();
return cohort.initiateDirectCommit();
}
final LocalThreePhaseCommitCohort cohort = ready();
return cohort.initiateDirectCommit();
}
private final Throwable failure;
private final Throwable failure;
- NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier) {
+ NoOpTransactionContext(final Throwable failure, final TransactionIdentifier identifier) {
super(identifier);
this.failure = failure;
}
super(identifier);
this.failure = failure;
}
- public Future<Object> directCommit() {
+ public Future<Object> directCommit(final Boolean havePermit) {
LOG.debug("Tx {} directCommit called, failure: {}", getIdentifier(), failure);
return akka.dispatch.Futures.failed(failure);
}
@Override
LOG.debug("Tx {} directCommit called, failure: {}", getIdentifier(), failure);
return akka.dispatch.Futures.failed(failure);
}
@Override
- public Future<ActorSelection> readyTransaction() {
+ public Future<ActorSelection> readyTransaction(final Boolean havePermit) {
LOG.debug("Tx {} readyTransaction called, failure: {}", getIdentifier(), failure);
return akka.dispatch.Futures.failed(failure);
}
@Override
LOG.debug("Tx {} readyTransaction called, failure: {}", getIdentifier(), failure);
return akka.dispatch.Futures.failed(failure);
}
@Override
- public void executeModification(AbstractModification modification) {
+ public void executeModification(final AbstractModification modification, final Boolean havePermit) {
LOG.debug("Tx {} executeModification {} called path = {}", getIdentifier(),
modification.getClass().getSimpleName(), modification.getPath());
}
@Override
LOG.debug("Tx {} executeModification {} called path = {}", getIdentifier(),
modification.getClass().getSimpleName(), modification.getPath());
}
@Override
- public <T> void executeRead(AbstractRead<T> readCmd, SettableFuture<T> proxyFuture) {
+ public <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> proxyFuture,
+ final Boolean havePermit) {
LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(),
readCmd.getPath());
LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(),
readCmd.getPath());
*/
private volatile Throwable failedModification;
*/
private volatile Throwable failedModification;
- protected RemoteTransactionContext(TransactionIdentifier identifier, ActorSelection actor,
- ActorContext actorContext, short remoteTransactionVersion, OperationLimiter limiter) {
+ protected RemoteTransactionContext(final TransactionIdentifier identifier, final ActorSelection actor,
+ final ActorContext actorContext, final short remoteTransactionVersion, final OperationLimiter limiter) {
super(identifier, remoteTransactionVersion);
this.limiter = Preconditions.checkNotNull(limiter);
this.actor = actor;
super(identifier, remoteTransactionVersion);
this.limiter = Preconditions.checkNotNull(limiter);
this.actor = actor;
- public Future<Object> directCommit() {
+ public Future<Object> directCommit(final Boolean havePermit) {
LOG.debug("Tx {} directCommit called", getIdentifier());
// Send the remaining batched modifications, if any, with the ready flag set.
LOG.debug("Tx {} directCommit called", getIdentifier());
// Send the remaining batched modifications, if any, with the ready flag set.
+ bumpPermits(havePermit);
return sendBatchedModifications(true, true);
}
@Override
return sendBatchedModifications(true, true);
}
@Override
- public Future<ActorSelection> readyTransaction() {
+ public Future<ActorSelection> readyTransaction(final Boolean havePermit) {
logModificationCount();
LOG.debug("Tx {} readyTransaction called", getIdentifier());
// Send the remaining batched modifications, if any, with the ready flag set.
logModificationCount();
LOG.debug("Tx {} readyTransaction called", getIdentifier());
// Send the remaining batched modifications, if any, with the ready flag set.
+ bumpPermits(havePermit);
Future<Object> lastModificationsFuture = sendBatchedModifications(true, false);
return transformReadyReply(lastModificationsFuture);
}
Future<Object> lastModificationsFuture = sendBatchedModifications(true, false);
return transformReadyReply(lastModificationsFuture);
}
+ private void bumpPermits(final Boolean havePermit) {
+ if (Boolean.TRUE.equals(havePermit)) {
+ ++batchPermits;
+ }
+ }
+
protected Future<ActorSelection> transformReadyReply(final Future<Object> readyReplyFuture) {
// Transform the last reply Future into a Future that returns the cohort actor path from
// the last reply message. That's the end result of the ready operation.
protected Future<ActorSelection> transformReadyReply(final Future<Object> readyReplyFuture) {
// Transform the last reply Future into a Future that returns the cohort actor path from
// the last reply message. That's the end result of the ready operation.
return new BatchedModifications(getIdentifier(), getTransactionVersion());
}
return new BatchedModifications(getIdentifier(), getTransactionVersion());
}
- private void batchModification(Modification modification, boolean havePermit) {
+ private void batchModification(final Modification modification, final boolean havePermit) {
incrementModificationCount();
if (havePermit) {
++batchPermits;
incrementModificationCount();
if (havePermit) {
++batchPermits;
return sendBatchedModifications(false, false);
}
return sendBatchedModifications(false, false);
}
- protected Future<Object> sendBatchedModifications(boolean ready, boolean doCommitOnReady) {
+ protected Future<Object> sendBatchedModifications(final boolean ready, final boolean doCommitOnReady) {
Future<Object> sent = null;
if (ready || batchedModifications != null && !batchedModifications.getModifications().isEmpty()) {
if (batchedModifications == null) {
Future<Object> sent = null;
if (ready || batchedModifications != null && !batchedModifications.getModifications().isEmpty()) {
if (batchedModifications == null) {
actorContext.getTransactionCommitOperationTimeout());
sent.onComplete(new OnComplete<Object>() {
@Override
actorContext.getTransactionCommitOperationTimeout());
sent.onComplete(new OnComplete<Object>() {
@Override
- public void onComplete(Throwable failure, Object success) {
+ public void onComplete(final Throwable failure, final Object success) {
if (failure != null) {
LOG.debug("Tx {} modifications failed", getIdentifier(), failure);
failedModification = failure;
if (failure != null) {
LOG.debug("Tx {} modifications failed", getIdentifier(), failure);
failedModification = failure;
- public void executeModification(AbstractModification modification) {
+ public void executeModification(final AbstractModification modification, final Boolean havePermit) {
LOG.debug("Tx {} executeModification {} called path = {}", getIdentifier(),
modification.getClass().getSimpleName(), modification.getPath());
LOG.debug("Tx {} executeModification {} called path = {}", getIdentifier(),
modification.getClass().getSimpleName(), modification.getPath());
- final boolean havePermit = failedModification == null && acquireOperation();
- batchModification(modification, havePermit);
+ final boolean permitToRelease;
+ if (havePermit == null) {
+ permitToRelease = failedModification == null && acquireOperation();
+ } else {
+ permitToRelease = havePermit.booleanValue();
+ }
+
+ batchModification(modification, permitToRelease);
- public <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> returnFuture) {
+ public <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> returnFuture,
+ final Boolean havePermit) {
LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(),
readCmd.getPath());
LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(),
readCmd.getPath());
// Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
// public API contract.
// Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
// public API contract.
- final boolean havePermit = acquireOperation();
+ final boolean permitToRelease = havePermit == null ? acquireOperation() : havePermit.booleanValue();
sendBatchedModifications();
OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
sendBatchedModifications();
OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
- public void onComplete(Throwable failure, Object response) {
+ public void onComplete(final Throwable failure, final Object response) {
// We have previously acquired an operation, now release it, no matter what happened
// We have previously acquired an operation, now release it, no matter what happened
* @return True if a permit was successfully acquired, false otherwise
*/
private boolean acquireOperation() {
* @return True if a permit was successfully acquired, false otherwise
*/
private boolean acquireOperation() {
- if (isOperationHandOffComplete()) {
- if (limiter.acquire()) {
- return true;
- }
+ Preconditions.checkState(isOperationHandOffComplete(),
+ "Attempted to acquire execute operation permit for transaction %s on actor %s during handoff",
+ getIdentifier(), actor);
- LOG.warn("Failed to acquire execute operation permit for transaction {} on actor {}", getIdentifier(),
- actor);
+ if (limiter.acquire()) {
+ return true;
+ LOG.warn("Failed to acquire execute operation permit for transaction {} on actor {}", getIdentifier(), actor);
interface TransactionContext {
void closeTransaction();
interface TransactionContext {
void closeTransaction();
- Future<ActorSelection> readyTransaction();
+ Future<ActorSelection> readyTransaction(Boolean havePermit);
- void executeModification(AbstractModification modification);
+ void executeModification(AbstractModification modification, Boolean havePermit);
- <T> void executeRead(AbstractRead<T> readCmd, SettableFuture<T> promise);
+ <T> void executeRead(AbstractRead<T> readCmd, SettableFuture<T> promise, Boolean havePermit);
- Future<Object> directCommit();
+ Future<Object> directCommit(Boolean havePermit);
/**
* Invoked by {@link TransactionContextWrapper} when it has finished handing
/**
* Invoked by {@link TransactionContextWrapper} when it has finished handing
private final TransactionContext cleanup;
private final TransactionContext cleanup;
- private TransactionContextCleanup(TransactionProxy referent, TransactionContext cleanup) {
+ private TransactionContextCleanup(final TransactionProxy referent, final TransactionContext cleanup) {
super(referent, QUEUE);
this.cleanup = cleanup;
}
super(referent, QUEUE);
this.cleanup = cleanup;
}
import akka.actor.ActorSelection;
import akka.dispatch.Futures;
import com.google.common.base.Preconditions;
import akka.actor.ActorSelection;
import akka.dispatch.Futures;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
* The list of transaction operations to execute once the TransactionContext becomes available.
*/
@GuardedBy("queuedTxOperations")
* The list of transaction operations to execute once the TransactionContext becomes available.
*/
@GuardedBy("queuedTxOperations")
- private final List<TransactionOperation> queuedTxOperations = Lists.newArrayList();
+ private final List<Entry<TransactionOperation, Boolean>> queuedTxOperations = new ArrayList<>();
private final TransactionIdentifier identifier;
private final TransactionIdentifier identifier;
+ private final OperationLimiter limiter;
private final String shardName;
/**
* The resulting TransactionContext.
*/
private volatile TransactionContext transactionContext;
private final String shardName;
/**
* The resulting TransactionContext.
*/
private volatile TransactionContext transactionContext;
-
- private final OperationLimiter limiter;
+ @GuardedBy("queuedTxOperations")
+ private TransactionContext deferredTransactionContext;
+ @GuardedBy("queuedTxOperations")
+ private boolean pendingEnqueue;
TransactionContextWrapper(final TransactionIdentifier identifier, final ActorContext actorContext,
final String shardName) {
TransactionContextWrapper(final TransactionIdentifier identifier, final ActorContext actorContext,
final String shardName) {
- * Adds a TransactionOperation to be executed once the TransactionContext becomes available.
+ * Adds a TransactionOperation to be executed once the TransactionContext becomes available. This method is called
+ * only after the caller has checked (without synchronizing with executePriorTransactionOperations()) that the
+ * context is not available.
*/
private void enqueueTransactionOperation(final TransactionOperation operation) {
*/
private void enqueueTransactionOperation(final TransactionOperation operation) {
- final boolean invokeOperation;
+ // We have three things to do here:
+ // - synchronize with executePriorTransactionOperations() so that logical operation ordering is maintained
+ // - acquire a permit for the operation if we still need to enqueue it
+ // - enqueue the operation
+ //
+ // Since each operation needs to acquire a permit exactly once and the limiter is shared between us and the
+ // TransactionContext, we need to know whether an operation has a permit before we enqueue it. Further
+ // complications are:
+ // - this method may be called from the thread invoking executePriorTransactionOperations()
+ // - user may be violating API contract of using the transaction from a single thread
+
+ // As a first step, we will synchronize on the queue and check if the handoff has completed. While we have
+ // the lock, we will assert that we will be enqueing another operation.
+ final TransactionContext contextOnEntry;
synchronized (queuedTxOperations) {
synchronized (queuedTxOperations) {
- if (transactionContext == null) {
- LOG.debug("Tx {} Queuing TransactionOperation", identifier);
-
- queuedTxOperations.add(operation);
- invokeOperation = false;
- } else {
- invokeOperation = true;
+ contextOnEntry = transactionContext;
+ if (contextOnEntry == null) {
+ Preconditions.checkState(pendingEnqueue == false, "Concurrent access to transaction %s detected",
+ identifier);
+ pendingEnqueue = true;
- if (invokeOperation) {
- operation.invoke(transactionContext);
- } else {
- if (!limiter.acquire()) {
+ // Short-circuit if there is a context
+ if (contextOnEntry != null) {
+ operation.invoke(transactionContext, null);
+ return;
+ }
+
+ boolean cleanupEnqueue = true;
+ TransactionContext finishHandoff = null;
+ try {
+ // Acquire the permit,
+ final boolean havePermit = limiter.acquire();
+ if (!havePermit) {
LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", identifier,
shardName);
}
LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", identifier,
shardName);
}
+
+ // Ready to enqueue, take the lock again and append the operation
+ synchronized (queuedTxOperations) {
+ LOG.debug("Tx {} Queuing TransactionOperation", identifier);
+ queuedTxOperations.add(new SimpleImmutableEntry<>(operation, havePermit));
+ pendingEnqueue = false;
+ cleanupEnqueue = false;
+ finishHandoff = deferredTransactionContext;
+ deferredTransactionContext = null;
+ }
+ } finally {
+ if (cleanupEnqueue) {
+ synchronized (queuedTxOperations) {
+ pendingEnqueue = false;
+ finishHandoff = deferredTransactionContext;
+ deferredTransactionContext = null;
+ }
+ }
+ if (finishHandoff != null) {
+ executePriorTransactionOperations(finishHandoff);
+ }
}
}
void maybeExecuteTransactionOperation(final TransactionOperation op) {
}
}
void maybeExecuteTransactionOperation(final TransactionOperation op) {
-
- if (transactionContext != null) {
- op.invoke(transactionContext);
+ final TransactionContext localContext = transactionContext;
+ if (localContext != null) {
+ op.invoke(localContext, null);
} else {
// The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
// callback to be executed after the Tx is created.
} else {
// The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
// callback to be executed after the Tx is created.
// in case a TransactionOperation results in another transaction operation being
// queued (eg a put operation from a client read Future callback that is notified
// synchronously).
// in case a TransactionOperation results in another transaction operation being
// queued (eg a put operation from a client read Future callback that is notified
// synchronously).
- final Collection<TransactionOperation> operationsBatch;
+ final Collection<Entry<TransactionOperation, Boolean>> operationsBatch;
synchronized (queuedTxOperations) {
if (queuedTxOperations.isEmpty()) {
synchronized (queuedTxOperations) {
if (queuedTxOperations.isEmpty()) {
- // We're done invoking the TransactionOperations so we can now publish the
- // TransactionContext.
- localTransactionContext.operationHandOffComplete();
- if (!localTransactionContext.usesOperationLimiting()) {
- limiter.releaseAll();
+ if (!pendingEnqueue) {
+ // We're done invoking the TransactionOperations so we can now publish the TransactionContext.
+ localTransactionContext.operationHandOffComplete();
+ if (!localTransactionContext.usesOperationLimiting()) {
+ limiter.releaseAll();
+ }
+
+ // This is null-to-non-null transition after which we are releasing the lock and not doing
+ // any further processing.
+ transactionContext = localTransactionContext;
+ } else {
+ deferredTransactionContext = localTransactionContext;
- transactionContext = localTransactionContext;
- break;
}
operationsBatch = new ArrayList<>(queuedTxOperations);
}
operationsBatch = new ArrayList<>(queuedTxOperations);
// Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
// A slight down-side is that we need to re-acquire the lock below but this should
// be negligible.
// Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
// A slight down-side is that we need to re-acquire the lock below but this should
// be negligible.
- for (TransactionOperation oper : operationsBatch) {
- oper.invoke(localTransactionContext);
+ for (Entry<TransactionOperation, Boolean> oper : operationsBatch) {
+ oper.getKey().invoke(localTransactionContext, oper.getValue());
}
}
}
Future<ActorSelection> readyTransaction() {
// avoid the creation of a promise and a TransactionOperation
}
}
}
Future<ActorSelection> readyTransaction() {
// avoid the creation of a promise and a TransactionOperation
- if (transactionContext != null) {
- return transactionContext.readyTransaction();
+ final TransactionContext localContext = transactionContext;
+ if (localContext != null) {
+ return localContext.readyTransaction(null);
}
final Promise<ActorSelection> promise = Futures.promise();
enqueueTransactionOperation(new TransactionOperation() {
@Override
}
final Promise<ActorSelection> promise = Futures.promise();
enqueueTransactionOperation(new TransactionOperation() {
@Override
- public void invoke(TransactionContext newTransactionContext) {
- promise.completeWith(newTransactionContext.readyTransaction());
+ public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) {
+ promise.completeWith(newTransactionContext.readyTransaction(havePermit));
}
});
return promise.future();
}
}
});
return promise.future();
}
- public OperationLimiter getLimiter() {
+ OperationLimiter getLimiter() {
*/
package org.opendaylight.controller.cluster.datastore;
*/
package org.opendaylight.controller.cluster.datastore;
+import org.eclipse.jdt.annotation.Nullable;
+
/**
* Abstract superclass for transaction operations which should be executed
* on a {@link TransactionContext} at a later point in time.
/**
* Abstract superclass for transaction operations which should be executed
* on a {@link TransactionContext} at a later point in time.
* Execute the delayed operation.
*
* @param transactionContext the TransactionContext
* Execute the delayed operation.
*
* @param transactionContext the TransactionContext
+ * @param havePermit Boolean indicator if this operation has tried and acquired a permit, null if there was no
+ * attempt to acquire a permit.
- protected abstract void invoke(TransactionContext transactionContext);
+ protected abstract void invoke(TransactionContext transactionContext, @Nullable Boolean havePermit);
TransactionContextWrapper contextWrapper = getContextWrapper(shardName);
contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
@Override
TransactionContextWrapper contextWrapper = getContextWrapper(shardName);
contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
@Override
- public void invoke(final TransactionContext transactionContext) {
- transactionContext.executeRead(readCmd, proxyFuture);
+ public void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
+ transactionContext.executeRead(readCmd, proxyFuture, havePermit);
TransactionContextWrapper contextWrapper = getContextWrapper(modification.getPath());
contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
@Override
TransactionContextWrapper contextWrapper = getContextWrapper(modification.getPath());
contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
@Override
- protected void invoke(final TransactionContext transactionContext) {
- transactionContext.executeModification(modification);
+ protected void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
+ transactionContext.executeModification(modification, havePermit);
for (TransactionContextWrapper contextWrapper : txContextWrappers.values()) {
contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
@Override
for (TransactionContextWrapper contextWrapper : txContextWrappers.values()) {
contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
@Override
- public void invoke(final TransactionContext transactionContext) {
+ public void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
transactionContext.closeTransaction();
}
});
transactionContext.closeTransaction();
}
});
final Promise promise = akka.dispatch.Futures.promise();
contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
@Override
final Promise promise = akka.dispatch.Futures.promise();
contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
@Override
- public void invoke(final TransactionContext newTransactionContext) {
- promise.completeWith(getDirectCommitFuture(newTransactionContext, operationCallbackRef));
+ public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) {
+ promise.completeWith(getDirectCommitFuture(newTransactionContext, operationCallbackRef,
+ havePermit));
}
});
future = promise.future();
} else {
// avoid the creation of a promise and a TransactionOperation
}
});
future = promise.future();
} else {
// avoid the creation of a promise and a TransactionOperation
- future = getDirectCommitFuture(transactionContext, operationCallbackRef);
+ future = getDirectCommitFuture(transactionContext, operationCallbackRef, null);
}
return new SingleCommitCohortProxy(txContextFactory.getActorContext(), future, getIdentifier(),
}
return new SingleCommitCohortProxy(txContextFactory.getActorContext(), future, getIdentifier(),
}
private Future<?> getDirectCommitFuture(final TransactionContext transactionContext,
}
private Future<?> getDirectCommitFuture(final TransactionContext transactionContext,
- final OperationCallback.Reference operationCallbackRef) {
+ final OperationCallback.Reference operationCallbackRef, final Boolean havePermit) {
TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback(
txContextFactory.getActorContext());
operationCallbackRef.set(rateLimitingCallback);
rateLimitingCallback.run();
TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback(
txContextFactory.getActorContext());
operationCallbackRef.set(rateLimitingCallback);
rateLimitingCallback.run();
- return transactionContext.directCommit();
+ return transactionContext.directCommit(havePermit);
}
private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort(
}
private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort(
public void testWrite() {
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.EMPTY;
NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
public void testWrite() {
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.EMPTY;
NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
- localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode));
+ localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode),
+ null);
verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
}
verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
}
public void testMerge() {
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.EMPTY;
NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
public void testMerge() {
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.EMPTY;
NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
- localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode));
+ localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode),
+ null);
verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
}
@Test
public void testDelete() {
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.EMPTY;
verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
}
@Test
public void testDelete() {
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.EMPTY;
- localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier));
+ localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier), null);
verify(readWriteTransaction).delete(yangInstanceIdentifier);
}
verify(readWriteTransaction).delete(yangInstanceIdentifier);
}
doReturn(Futures.immediateCheckedFuture(Optional.of(normalizedNode))).when(readWriteTransaction)
.read(yangInstanceIdentifier);
localTransactionContext.executeRead(new ReadData(yangInstanceIdentifier, DataStoreVersions.CURRENT_VERSION),
doReturn(Futures.immediateCheckedFuture(Optional.of(normalizedNode))).when(readWriteTransaction)
.read(yangInstanceIdentifier);
localTransactionContext.executeRead(new ReadData(yangInstanceIdentifier, DataStoreVersions.CURRENT_VERSION),
- SettableFuture.<Optional<NormalizedNode<?,?>>>create());
+ SettableFuture.create(), null);
verify(readWriteTransaction).read(yangInstanceIdentifier);
}
verify(readWriteTransaction).read(yangInstanceIdentifier);
}
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.EMPTY;
doReturn(Futures.immediateCheckedFuture(true)).when(readWriteTransaction).exists(yangInstanceIdentifier);
localTransactionContext.executeRead(new DataExists(yangInstanceIdentifier, DataStoreVersions.CURRENT_VERSION),
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.EMPTY;
doReturn(Futures.immediateCheckedFuture(true)).when(readWriteTransaction).exists(yangInstanceIdentifier);
localTransactionContext.executeRead(new DataExists(yangInstanceIdentifier, DataStoreVersions.CURRENT_VERSION),
- SettableFuture.<Boolean>create());
+ SettableFuture.create(), null);
verify(readWriteTransaction).exists(yangInstanceIdentifier);
}
verify(readWriteTransaction).exists(yangInstanceIdentifier);
}
doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit();
doReturn(mockCohort).when(mockReadySupport).onTransactionReady(readWriteTransaction, null);
doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit();
doReturn(mockCohort).when(mockReadySupport).onTransactionReady(readWriteTransaction, null);
- Future<ActorSelection> future = localTransactionContext.readyTransaction();
+ Future<ActorSelection> future = localTransactionContext.readyTransaction(null);
assertTrue(future.isCompleted());
verify(mockReadySupport).onTransactionReady(readWriteTransaction, null);
assertTrue(future.isCompleted());
verify(mockReadySupport).onTransactionReady(readWriteTransaction, null);
RuntimeException error = new RuntimeException("mock");
doThrow(error).when(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
RuntimeException error = new RuntimeException("mock");
doThrow(error).when(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
- localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode));
- localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode));
+ localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode),
+ null);
+ localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode),
+ null);
verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
RuntimeException error = new RuntimeException("mock");
doThrow(error).when(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
RuntimeException error = new RuntimeException("mock");
doThrow(error).when(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
- localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode));
- localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode));
+ localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode),
+ null);
+ localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode),
+ null);
verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
RuntimeException error = new RuntimeException("mock");
doThrow(error).when(readWriteTransaction).delete(yangInstanceIdentifier);
RuntimeException error = new RuntimeException("mock");
doThrow(error).when(readWriteTransaction).delete(yangInstanceIdentifier);
- localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier));
- localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier));
+ localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier), null);
+ localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier), null);
verify(readWriteTransaction).delete(yangInstanceIdentifier);
doReadyWithExpectedError(error);
}
verify(readWriteTransaction).delete(yangInstanceIdentifier);
doReadyWithExpectedError(error);
}
- private void doReadyWithExpectedError(RuntimeException expError) {
+ private void doReadyWithExpectedError(final RuntimeException expError) {
LocalThreePhaseCommitCohort mockCohort = mock(LocalThreePhaseCommitCohort.class);
doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit();
doReturn(mockCohort).when(mockReadySupport).onTransactionReady(readWriteTransaction, expError);
LocalThreePhaseCommitCohort mockCohort = mock(LocalThreePhaseCommitCohort.class);
doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit();
doReturn(mockCohort).when(mockReadySupport).onTransactionReady(readWriteTransaction, expError);
- localTransactionContext.readyTransaction();
+ localTransactionContext.readyTransaction(null);
*/
@Test
public void testLimiterOnFailure() throws TimeoutException, InterruptedException {
*/
@Test
public void testLimiterOnFailure() throws TimeoutException, InterruptedException {
- txContext.executeModification(DELETE);
- txContext.executeModification(DELETE);
+ txContext.executeModification(DELETE, null);
+ txContext.executeModification(DELETE, null);
assertEquals(2, limiter.availablePermits());
Future<Object> future = txContext.sendBatchedModifications();
assertEquals(2, limiter.availablePermits());
Future<Object> future = txContext.sendBatchedModifications();
assertEquals(4, limiter.availablePermits());
// The transaction has failed, no throttling should occur
assertEquals(4, limiter.availablePermits());
// The transaction has failed, no throttling should occur
- txContext.executeModification(DELETE);
+ txContext.executeModification(DELETE, null);
assertEquals(4, limiter.availablePermits());
// Executing a read should result in immediate failure
final SettableFuture<Boolean> readFuture = SettableFuture.create();
assertEquals(4, limiter.availablePermits());
// Executing a read should result in immediate failure
final SettableFuture<Boolean> readFuture = SettableFuture.create();
- txContext.executeRead(new DataExists(), readFuture);
+ txContext.executeRead(new DataExists(), readFuture, null);
assertTrue(readFuture.isDone());
try {
readFuture.get();
assertTrue(readFuture.isDone());
try {
readFuture.get();
- future = txContext.directCommit();
+ future = txContext.directCommit(null);
msg = kit.expectMsgClass(BatchedModifications.class);
// Modification should have been thrown away by the dropped transmit induced by executeRead()
msg = kit.expectMsgClass(BatchedModifications.class);
// Modification should have been thrown away by the dropped transmit induced by executeRead()
*/
@Test
public void testLimiterOnOverflowFailure() throws TimeoutException, InterruptedException {
*/
@Test
public void testLimiterOnOverflowFailure() throws TimeoutException, InterruptedException {
- txContext.executeModification(DELETE);
- txContext.executeModification(DELETE);
- txContext.executeModification(DELETE);
- txContext.executeModification(DELETE);
+ txContext.executeModification(DELETE, null);
+ txContext.executeModification(DELETE, null);
+ txContext.executeModification(DELETE, null);
+ txContext.executeModification(DELETE, null);
assertEquals(0, limiter.availablePermits());
assertEquals(0, limiter.availablePermits());
- txContext.executeModification(DELETE);
+ txContext.executeModification(DELETE, null);
// Last acquire should have failed ...
assertEquals(0, limiter.availablePermits());
// Last acquire should have failed ...
assertEquals(0, limiter.availablePermits());