From 123afd8b015173c459f4937c84eb2e91286f65a8 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Thu, 5 Apr 2018 16:49:06 +0200 Subject: [PATCH] Fix TransactionContextWrapper limiter accounting The fix for CONTROLLER-1814 wrecked the fragile OperationLimiter accounting between TransactionContextWrapper and RemoteTransactionContext. The problem is that during initial connect time TransactionContextWrapper acquires limiter operations and the state of the OperationLimiter is inherited by RemoteTransactionContext. This though means that we need to know which operation actually acquired a permit and which did not. As it turns out, this needs to make TransactionContext methods take an additional hint as to whether a limit attempt was made and what was the result of it. Furthermore the internal TransactionContextWrapper logic needs to be changed from 'enqueue and wait' to 'wait and enqueue' strategy, so when an operation is added to the queue and potentially picked up by executePriorTransactionOperations() we already know whether a permit was acquired or not. JIRA: CONTROLLER-1823 Change-Id: I78d43a1abde8c6da6e3da2f56823bba130499133 Signed-off-by: Robert Varga (cherry picked from commit b69500a51978c3d3ef639345a1a97a58cc3f6bb8) (cherry picked from commit dc295d9be77748d7e695d003a02d299d493abc8d) --- .../datastore/LocalTransactionContext.java | 9 +- .../datastore/NoOpTransactionContext.java | 11 +- .../datastore/RemoteTransactionContext.java | 56 +++++--- .../cluster/datastore/TransactionContext.java | 8 +- .../datastore/TransactionContextCleanup.java | 2 +- .../datastore/TransactionContextWrapper.java | 125 ++++++++++++------ .../datastore/TransactionOperation.java | 6 +- .../cluster/datastore/TransactionProxy.java | 21 +-- .../LocalTransactionContextTest.java | 34 +++-- .../RemoteTransactionContextTest.java | 20 +-- 10 files changed, 185 insertions(+), 107 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java index 9fad1f1016..8c4dddc565 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java @@ -45,7 +45,7 @@ abstract class LocalTransactionContext extends AbstractTransactionContext { @Override @SuppressWarnings("checkstyle:IllegalCatch") - public void executeModification(final AbstractModification modification) { + public void executeModification(final AbstractModification modification, final Boolean havePermit) { incrementModificationCount(); if (operationError == null) { try { @@ -57,7 +57,8 @@ abstract class LocalTransactionContext extends AbstractTransactionContext { } @Override - public void executeRead(final AbstractRead readCmd, final SettableFuture proxyFuture) { + public void executeRead(final AbstractRead readCmd, final SettableFuture proxyFuture, + final Boolean havePermit) { Futures.addCallback(readCmd.apply(getReadDelegate()), new FutureCallback() { @Override public void onSuccess(final T result) { @@ -77,13 +78,13 @@ abstract class LocalTransactionContext extends AbstractTransactionContext { } @Override - public Future readyTransaction() { + public Future readyTransaction(final Boolean havePermit) { final LocalThreePhaseCommitCohort cohort = ready(); return cohort.initiateCoordinatedCommit(); } @Override - public Future directCommit() { + public Future directCommit(final Boolean havePermit) { final LocalThreePhaseCommitCohort cohort = ready(); return cohort.initiateDirectCommit(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java index b9c2e13563..d14a936ac3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java @@ -24,7 +24,7 @@ final class NoOpTransactionContext extends AbstractTransactionContext { private final Throwable failure; - NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier) { + NoOpTransactionContext(final Throwable failure, final TransactionIdentifier identifier) { super(identifier); this.failure = failure; } @@ -35,25 +35,26 @@ final class NoOpTransactionContext extends AbstractTransactionContext { } @Override - public Future directCommit() { + public Future directCommit(final Boolean havePermit) { LOG.debug("Tx {} directCommit called, failure: {}", getIdentifier(), failure); return akka.dispatch.Futures.failed(failure); } @Override - public Future readyTransaction() { + public Future readyTransaction(final Boolean havePermit) { LOG.debug("Tx {} readyTransaction called, failure: {}", getIdentifier(), failure); return akka.dispatch.Futures.failed(failure); } @Override - public void executeModification(AbstractModification modification) { + public void executeModification(final AbstractModification modification, final Boolean havePermit) { LOG.debug("Tx {} executeModification {} called path = {}", getIdentifier(), modification.getClass().getSimpleName(), modification.getPath()); } @Override - public void executeRead(AbstractRead readCmd, SettableFuture proxyFuture) { + public void executeRead(final AbstractRead readCmd, final SettableFuture proxyFuture, + final Boolean havePermit) { LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(), readCmd.getPath()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java index 941c86116c..27969b3e8e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java @@ -50,8 +50,8 @@ public class RemoteTransactionContext extends AbstractTransactionContext { */ private volatile Throwable failedModification; - protected RemoteTransactionContext(TransactionIdentifier identifier, ActorSelection actor, - ActorContext actorContext, short remoteTransactionVersion, OperationLimiter limiter) { + protected RemoteTransactionContext(final TransactionIdentifier identifier, final ActorSelection actor, + final ActorContext actorContext, final short remoteTransactionVersion, final OperationLimiter limiter) { super(identifier, remoteTransactionVersion); this.limiter = Preconditions.checkNotNull(limiter); this.actor = actor; @@ -75,27 +75,34 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } @Override - public Future directCommit() { + public Future directCommit(final Boolean havePermit) { LOG.debug("Tx {} directCommit called", getIdentifier()); // Send the remaining batched modifications, if any, with the ready flag set. - + bumpPermits(havePermit); return sendBatchedModifications(true, true); } @Override - public Future readyTransaction() { + public Future readyTransaction(final Boolean havePermit) { logModificationCount(); LOG.debug("Tx {} readyTransaction called", getIdentifier()); // Send the remaining batched modifications, if any, with the ready flag set. + bumpPermits(havePermit); Future lastModificationsFuture = sendBatchedModifications(true, false); return transformReadyReply(lastModificationsFuture); } + private void bumpPermits(final Boolean havePermit) { + if (Boolean.TRUE.equals(havePermit)) { + ++batchPermits; + } + } + protected Future transformReadyReply(final Future readyReplyFuture) { // Transform the last reply Future into a Future that returns the cohort actor path from // the last reply message. That's the end result of the ready operation. @@ -107,7 +114,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { return new BatchedModifications(getIdentifier(), getTransactionVersion()); } - private void batchModification(Modification modification, boolean havePermit) { + private void batchModification(final Modification modification, final boolean havePermit) { incrementModificationCount(); if (havePermit) { ++batchPermits; @@ -129,7 +136,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { return sendBatchedModifications(false, false); } - protected Future sendBatchedModifications(boolean ready, boolean doCommitOnReady) { + protected Future sendBatchedModifications(final boolean ready, final boolean doCommitOnReady) { Future sent = null; if (ready || batchedModifications != null && !batchedModifications.getModifications().isEmpty()) { if (batchedModifications == null) { @@ -167,7 +174,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { actorContext.getTransactionCommitOperationTimeout()); sent.onComplete(new OnComplete() { @Override - public void onComplete(Throwable failure, Object success) { + public void onComplete(final Throwable failure, final Object success) { if (failure != null) { LOG.debug("Tx {} modifications failed", getIdentifier(), failure); failedModification = failure; @@ -183,16 +190,23 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } @Override - public void executeModification(AbstractModification modification) { + public void executeModification(final AbstractModification modification, final Boolean havePermit) { LOG.debug("Tx {} executeModification {} called path = {}", getIdentifier(), modification.getClass().getSimpleName(), modification.getPath()); - final boolean havePermit = failedModification == null && acquireOperation(); - batchModification(modification, havePermit); + final boolean permitToRelease; + if (havePermit == null) { + permitToRelease = failedModification == null && acquireOperation(); + } else { + permitToRelease = havePermit.booleanValue(); + } + + batchModification(modification, permitToRelease); } @Override - public void executeRead(final AbstractRead readCmd, final SettableFuture returnFuture) { + public void executeRead(final AbstractRead readCmd, final SettableFuture returnFuture, + final Boolean havePermit) { LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(), readCmd.getPath()); @@ -209,14 +223,14 @@ public class RemoteTransactionContext extends AbstractTransactionContext { // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the // public API contract. - final boolean havePermit = acquireOperation(); + final boolean permitToRelease = havePermit == null ? acquireOperation() : havePermit.booleanValue(); sendBatchedModifications(); OnComplete onComplete = new OnComplete() { @Override - public void onComplete(Throwable failure, Object response) { + public void onComplete(final Throwable failure, final Object response) { // We have previously acquired an operation, now release it, no matter what happened - if (havePermit) { + if (permitToRelease) { limiter.release(); } @@ -245,15 +259,15 @@ public class RemoteTransactionContext extends AbstractTransactionContext { * @return True if a permit was successfully acquired, false otherwise */ private boolean acquireOperation() { - if (isOperationHandOffComplete()) { - if (limiter.acquire()) { - return true; - } + Preconditions.checkState(isOperationHandOffComplete(), + "Attempted to acquire execute operation permit for transaction %s on actor %s during handoff", + getIdentifier(), actor); - LOG.warn("Failed to acquire execute operation permit for transaction {} on actor {}", getIdentifier(), - actor); + if (limiter.acquire()) { + return true; } + LOG.warn("Failed to acquire execute operation permit for transaction {} on actor {}", getIdentifier(), actor); return false; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java index b62b056d58..543834c2cb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java @@ -20,13 +20,13 @@ import scala.concurrent.Future; interface TransactionContext { void closeTransaction(); - Future readyTransaction(); + Future readyTransaction(Boolean havePermit); - void executeModification(AbstractModification modification); + void executeModification(AbstractModification modification, Boolean havePermit); - void executeRead(AbstractRead readCmd, SettableFuture promise); + void executeRead(AbstractRead readCmd, SettableFuture promise, Boolean havePermit); - Future directCommit(); + Future directCommit(Boolean havePermit); /** * Invoked by {@link TransactionContextWrapper} when it has finished handing diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextCleanup.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextCleanup.java index 865cc60edb..d4b52e5252 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextCleanup.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextCleanup.java @@ -42,7 +42,7 @@ final class TransactionContextCleanup extends FinalizablePhantomReference queuedTxOperations = Lists.newArrayList(); + private final List> queuedTxOperations = new ArrayList<>(); private final TransactionIdentifier identifier; + private final OperationLimiter limiter; private final String shardName; /** * The resulting TransactionContext. */ private volatile TransactionContext transactionContext; - - private final OperationLimiter limiter; + @GuardedBy("queuedTxOperations") + private TransactionContext deferredTransactionContext; + @GuardedBy("queuedTxOperations") + private boolean pendingEnqueue; TransactionContextWrapper(final TransactionIdentifier identifier, final ActorContext actorContext, final String shardName) { @@ -67,35 +71,77 @@ class TransactionContextWrapper { } /** - * Adds a TransactionOperation to be executed once the TransactionContext becomes available. + * Adds a TransactionOperation to be executed once the TransactionContext becomes available. This method is called + * only after the caller has checked (without synchronizing with executePriorTransactionOperations()) that the + * context is not available. */ private void enqueueTransactionOperation(final TransactionOperation operation) { - final boolean invokeOperation; + // We have three things to do here: + // - synchronize with executePriorTransactionOperations() so that logical operation ordering is maintained + // - acquire a permit for the operation if we still need to enqueue it + // - enqueue the operation + // + // Since each operation needs to acquire a permit exactly once and the limiter is shared between us and the + // TransactionContext, we need to know whether an operation has a permit before we enqueue it. Further + // complications are: + // - this method may be called from the thread invoking executePriorTransactionOperations() + // - user may be violating API contract of using the transaction from a single thread + + // As a first step, we will synchronize on the queue and check if the handoff has completed. While we have + // the lock, we will assert that we will be enqueing another operation. + final TransactionContext contextOnEntry; synchronized (queuedTxOperations) { - if (transactionContext == null) { - LOG.debug("Tx {} Queuing TransactionOperation", identifier); - - queuedTxOperations.add(operation); - invokeOperation = false; - } else { - invokeOperation = true; + contextOnEntry = transactionContext; + if (contextOnEntry == null) { + Preconditions.checkState(pendingEnqueue == false, "Concurrent access to transaction %s detected", + identifier); + pendingEnqueue = true; } } - if (invokeOperation) { - operation.invoke(transactionContext); - } else { - if (!limiter.acquire()) { + // Short-circuit if there is a context + if (contextOnEntry != null) { + operation.invoke(transactionContext, null); + return; + } + + boolean cleanupEnqueue = true; + TransactionContext finishHandoff = null; + try { + // Acquire the permit, + final boolean havePermit = limiter.acquire(); + if (!havePermit) { LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", identifier, shardName); } + + // Ready to enqueue, take the lock again and append the operation + synchronized (queuedTxOperations) { + LOG.debug("Tx {} Queuing TransactionOperation", identifier); + queuedTxOperations.add(new SimpleImmutableEntry<>(operation, havePermit)); + pendingEnqueue = false; + cleanupEnqueue = false; + finishHandoff = deferredTransactionContext; + deferredTransactionContext = null; + } + } finally { + if (cleanupEnqueue) { + synchronized (queuedTxOperations) { + pendingEnqueue = false; + finishHandoff = deferredTransactionContext; + deferredTransactionContext = null; + } + } + if (finishHandoff != null) { + executePriorTransactionOperations(finishHandoff); + } } } void maybeExecuteTransactionOperation(final TransactionOperation op) { - - if (transactionContext != null) { - op.invoke(transactionContext); + final TransactionContext localContext = transactionContext; + if (localContext != null) { + op.invoke(localContext, null); } else { // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future // callback to be executed after the Tx is created. @@ -114,17 +160,23 @@ class TransactionContextWrapper { // in case a TransactionOperation results in another transaction operation being // queued (eg a put operation from a client read Future callback that is notified // synchronously). - final Collection operationsBatch; + final Collection> operationsBatch; synchronized (queuedTxOperations) { if (queuedTxOperations.isEmpty()) { - // We're done invoking the TransactionOperations so we can now publish the - // TransactionContext. - localTransactionContext.operationHandOffComplete(); - if (!localTransactionContext.usesOperationLimiting()) { - limiter.releaseAll(); + if (!pendingEnqueue) { + // We're done invoking the TransactionOperations so we can now publish the TransactionContext. + localTransactionContext.operationHandOffComplete(); + if (!localTransactionContext.usesOperationLimiting()) { + limiter.releaseAll(); + } + + // This is null-to-non-null transition after which we are releasing the lock and not doing + // any further processing. + transactionContext = localTransactionContext; + } else { + deferredTransactionContext = localTransactionContext; } - transactionContext = localTransactionContext; - break; + return; } operationsBatch = new ArrayList<>(queuedTxOperations); @@ -134,32 +186,31 @@ class TransactionContextWrapper { // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking. // A slight down-side is that we need to re-acquire the lock below but this should // be negligible. - for (TransactionOperation oper : operationsBatch) { - oper.invoke(localTransactionContext); + for (Entry oper : operationsBatch) { + oper.getKey().invoke(localTransactionContext, oper.getValue()); } } } Future readyTransaction() { // avoid the creation of a promise and a TransactionOperation - if (transactionContext != null) { - return transactionContext.readyTransaction(); + final TransactionContext localContext = transactionContext; + if (localContext != null) { + return localContext.readyTransaction(null); } final Promise promise = Futures.promise(); enqueueTransactionOperation(new TransactionOperation() { @Override - public void invoke(TransactionContext newTransactionContext) { - promise.completeWith(newTransactionContext.readyTransaction()); + public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) { + promise.completeWith(newTransactionContext.readyTransaction(havePermit)); } }); return promise.future(); } - public OperationLimiter getLimiter() { + OperationLimiter getLimiter() { return limiter; } - - } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionOperation.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionOperation.java index 3defd2cc50..962d26133b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionOperation.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionOperation.java @@ -7,6 +7,8 @@ */ package org.opendaylight.controller.cluster.datastore; +import org.eclipse.jdt.annotation.Nullable; + /** * Abstract superclass for transaction operations which should be executed * on a {@link TransactionContext} at a later point in time. @@ -16,6 +18,8 @@ abstract class TransactionOperation { * Execute the delayed operation. * * @param transactionContext the TransactionContext + * @param havePermit Boolean indicator if this operation has tried and acquired a permit, null if there was no + * attempt to acquire a permit. */ - protected abstract void invoke(TransactionContext transactionContext); + protected abstract void invoke(TransactionContext transactionContext, @Nullable Boolean havePermit); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 7bb62aa3cf..aca4e41d80 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -92,8 +92,8 @@ public class TransactionProxy extends AbstractDOMStoreTransaction getDirectCommitFuture(final TransactionContext transactionContext, - final OperationCallback.Reference operationCallbackRef) { + final OperationCallback.Reference operationCallbackRef, final Boolean havePermit) { TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback( txContextFactory.getActorContext()); operationCallbackRef.set(rateLimitingCallback); rateLimitingCallback.run(); - return transactionContext.directCommit(); + return transactionContext.directCommit(havePermit); } private AbstractThreePhaseCommitCohort createMultiCommitCohort( diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java index 016b4b7927..e63501de6c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java @@ -68,7 +68,8 @@ public class LocalTransactionContextTest { public void testWrite() { YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.EMPTY; NormalizedNode normalizedNode = mock(NormalizedNode.class); - localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode)); + localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode), + null); verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode); } @@ -76,14 +77,15 @@ public class LocalTransactionContextTest { public void testMerge() { YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.EMPTY; NormalizedNode normalizedNode = mock(NormalizedNode.class); - localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode)); + localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode), + null); verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode); } @Test public void testDelete() { YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.EMPTY; - localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier)); + localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier), null); verify(readWriteTransaction).delete(yangInstanceIdentifier); } @@ -95,7 +97,7 @@ public class LocalTransactionContextTest { doReturn(Futures.immediateCheckedFuture(Optional.of(normalizedNode))).when(readWriteTransaction) .read(yangInstanceIdentifier); localTransactionContext.executeRead(new ReadData(yangInstanceIdentifier, DataStoreVersions.CURRENT_VERSION), - SettableFuture.>>create()); + SettableFuture.create(), null); verify(readWriteTransaction).read(yangInstanceIdentifier); } @@ -104,7 +106,7 @@ public class LocalTransactionContextTest { YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.EMPTY; doReturn(Futures.immediateCheckedFuture(true)).when(readWriteTransaction).exists(yangInstanceIdentifier); localTransactionContext.executeRead(new DataExists(yangInstanceIdentifier, DataStoreVersions.CURRENT_VERSION), - SettableFuture.create()); + SettableFuture.create(), null); verify(readWriteTransaction).exists(yangInstanceIdentifier); } @@ -114,7 +116,7 @@ public class LocalTransactionContextTest { doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit(); doReturn(mockCohort).when(mockReadySupport).onTransactionReady(readWriteTransaction, null); - Future future = localTransactionContext.readyTransaction(); + Future future = localTransactionContext.readyTransaction(null); assertTrue(future.isCompleted()); verify(mockReadySupport).onTransactionReady(readWriteTransaction, null); @@ -127,8 +129,10 @@ public class LocalTransactionContextTest { RuntimeException error = new RuntimeException("mock"); doThrow(error).when(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode); - localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode)); - localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode)); + localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode), + null); + localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode), + null); verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode); @@ -142,8 +146,10 @@ public class LocalTransactionContextTest { RuntimeException error = new RuntimeException("mock"); doThrow(error).when(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode); - localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode)); - localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode)); + localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode), + null); + localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode), + null); verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode); @@ -156,19 +162,19 @@ public class LocalTransactionContextTest { RuntimeException error = new RuntimeException("mock"); doThrow(error).when(readWriteTransaction).delete(yangInstanceIdentifier); - localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier)); - localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier)); + localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier), null); + localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier), null); verify(readWriteTransaction).delete(yangInstanceIdentifier); doReadyWithExpectedError(error); } - private void doReadyWithExpectedError(RuntimeException expError) { + private void doReadyWithExpectedError(final RuntimeException expError) { LocalThreePhaseCommitCohort mockCohort = mock(LocalThreePhaseCommitCohort.class); doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit(); doReturn(mockCohort).when(mockReadySupport).onTransactionReady(readWriteTransaction, expError); - localTransactionContext.readyTransaction(); + localTransactionContext.readyTransaction(null); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextTest.java index 526ad77b78..3f9b81343d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextTest.java @@ -72,8 +72,8 @@ public class RemoteTransactionContextTest extends AbstractActorTest { */ @Test public void testLimiterOnFailure() throws TimeoutException, InterruptedException { - txContext.executeModification(DELETE); - txContext.executeModification(DELETE); + txContext.executeModification(DELETE, null); + txContext.executeModification(DELETE, null); assertEquals(2, limiter.availablePermits()); Future future = txContext.sendBatchedModifications(); @@ -90,12 +90,12 @@ public class RemoteTransactionContextTest extends AbstractActorTest { assertEquals(4, limiter.availablePermits()); // The transaction has failed, no throttling should occur - txContext.executeModification(DELETE); + txContext.executeModification(DELETE, null); assertEquals(4, limiter.availablePermits()); // Executing a read should result in immediate failure final SettableFuture readFuture = SettableFuture.create(); - txContext.executeRead(new DataExists(), readFuture); + txContext.executeRead(new DataExists(), readFuture, null); assertTrue(readFuture.isDone()); try { readFuture.get(); @@ -106,7 +106,7 @@ public class RemoteTransactionContextTest extends AbstractActorTest { } }); - future = txContext.directCommit(); + future = txContext.directCommit(null); msg = kit.expectMsgClass(BatchedModifications.class); // Modification should have been thrown away by the dropped transmit induced by executeRead() @@ -131,12 +131,12 @@ public class RemoteTransactionContextTest extends AbstractActorTest { */ @Test public void testLimiterOnOverflowFailure() throws TimeoutException, InterruptedException { - txContext.executeModification(DELETE); - txContext.executeModification(DELETE); - txContext.executeModification(DELETE); - txContext.executeModification(DELETE); + txContext.executeModification(DELETE, null); + txContext.executeModification(DELETE, null); + txContext.executeModification(DELETE, null); + txContext.executeModification(DELETE, null); assertEquals(0, limiter.availablePermits()); - txContext.executeModification(DELETE); + txContext.executeModification(DELETE, null); // Last acquire should have failed ... assertEquals(0, limiter.availablePermits()); -- 2.36.6