*/
package org.opendaylight.controller.cluster.datastore;
+import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
abstract class AbstractTransactionContext implements TransactionContext {
-
private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContext.class);
-
+ private final OperationLimiter limiter;
private long modificationCount = 0;
+ private boolean handoffComplete;
+
+ protected AbstractTransactionContext(final OperationLimiter limiter) {
+ this.limiter = Preconditions.checkNotNull(limiter);
+ }
- private final TransactionIdentifier identifier;
+ /**
+ * Get the transaction identifier associated with this context.
+ *
+ * @return Transaction identifier.
+ */
+ @Nonnull protected final TransactionIdentifier getIdentifier() {
+ return limiter.getIdentifier();
+ }
- protected AbstractTransactionContext(TransactionIdentifier identifier) {
- this.identifier = identifier;
+ /**
+ * Return the operation limiter associated with this context.
+ * @return Operation limiter.
+ */
+ @Nonnull protected final OperationLimiter getLimiter() {
+ return limiter;
}
- protected final TransactionIdentifier getIdentifier() {
- return identifier;
+ /**
+ * Indicate whether all operations have been handed off by the {@link TransactionContextWrapper}.
+ *
+ * @return True if this context is responsible for throttling.
+ */
+ protected final boolean isOperationHandoffComplete() {
+ return handoffComplete;
}
- protected void incrementModificationCount(){
+ /**
+ * Acquire operation from the limiter if the handoff has completed. If
+ * the handoff is still ongoing, this method does nothing.
+ */
+ protected final void acquireOperation() {
+ if (handoffComplete) {
+ limiter.acquire();
+ }
+ }
+
+ /**
+ * Acquire operation from the limiter if the handoff has NOT completed. If
+ * the handoff has completed, this method does nothing.
+ */
+ protected final void releaseOperation() {
+ if (!handoffComplete) {
+ limiter.release();
+ }
+ }
+
+ protected final void incrementModificationCount() {
modificationCount++;
}
- protected void logModificationCount(){
- LOG.debug("Total modifications on Tx {} = [ {} ]", identifier, modificationCount);
+ protected final void logModificationCount() {
+ LOG.debug("Total modifications on Tx {} = [ {} ]", getIdentifier(), modificationCount);
+ }
+
+ @Override
+ public final void operationHandoffComplete() {
+ handoffComplete = true;
}
-}
\ No newline at end of file
+}
LOG.debug("Tx {}: Find primary for shard {} failed", parent.getIdentifier(), shardName, failure);
transactionContextAdapter.executePriorTransactionOperations(new NoOpTransactionContext(failure,
- parent.getIdentifier(), parent.getLimiter()));
+ parent.getLimiter()));
}
final TransactionContextWrapper newTransactionAdapter(final TransactionProxy parent, final String shardName) {
- final TransactionContextWrapper transactionContextAdapter = new TransactionContextWrapper(parent.getIdentifier());
+ final TransactionContextWrapper transactionContextAdapter = new TransactionContextWrapper(parent.getLimiter());
Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName);
if(findPrimaryFuture.isCompleted()) {
switch(parent.getType()) {
case READ_ONLY:
final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
- return new LocalTransactionContext(parent.getIdentifier(), readOnly, parent.getLimiter()) {
+ return new LocalTransactionContext(readOnly, parent.getLimiter()) {
@Override
protected DOMStoreWriteTransaction getWriteDelegate() {
throw new UnsupportedOperationException();
};
case READ_WRITE:
final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier());
- return new LocalTransactionContext(parent.getIdentifier(), readWrite, parent.getLimiter()) {
+ return new LocalTransactionContext(readWrite, parent.getLimiter()) {
@Override
protected DOMStoreWriteTransaction getWriteDelegate() {
return readWrite;
};
case WRITE_ONLY:
final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier());
- return new LocalTransactionContext(parent.getIdentifier(), writeOnly, parent.getLimiter()) {
+ return new LocalTransactionContext(writeOnly, parent.getLimiter()) {
@Override
protected DOMStoreWriteTransaction getWriteDelegate() {
return writeOnly;
import akka.dispatch.OnComplete;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
+import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
return actorContext.executeOperationAsync(leader, message, actorContext.getTransactionCommitOperationTimeout());
}
+ /**
+ * Return the {@link ActorContext} associated with this object.
+ *
+ * @return An actor context instance.
+ */
+ @Nonnull ActorContext getActorContext() {
+ return actorContext;
+ }
+
Future<ActorSelection> initiateCoordinatedCommit() {
final Future<Object> messageFuture = initiateCommit(false);
final Future<ActorSelection> ret = TransactionReadyReplyMapper.transform(messageFuture, actorContext,
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
* @author Thomas Pantelis
*/
abstract class LocalTransactionContext extends AbstractTransactionContext {
-
private final DOMStoreTransaction txDelegate;
- private final OperationLimiter limiter;
- LocalTransactionContext(TransactionIdentifier identifier, DOMStoreTransaction txDelegate, OperationLimiter limiter) {
- super(identifier);
+ LocalTransactionContext(DOMStoreTransaction txDelegate, OperationLimiter limiter) {
+ super(limiter);
this.txDelegate = Preconditions.checkNotNull(txDelegate);
- this.limiter = Preconditions.checkNotNull(limiter);
}
protected abstract DOMStoreWriteTransaction getWriteDelegate();
public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
incrementModificationCount();
getWriteDelegate().write(path, data);
- limiter.release();
+ releaseOperation();
}
@Override
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
incrementModificationCount();
getWriteDelegate().merge(path, data);
- limiter.release();
+ releaseOperation();
}
@Override
public void deleteData(YangInstanceIdentifier path) {
incrementModificationCount();
getWriteDelegate().delete(path);
- limiter.release();
+ releaseOperation();
}
@Override
public void readData(YangInstanceIdentifier path, final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
Futures.addCallback(getReadDelegate().read(path), new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
@Override
- public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
+ public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
proxyFuture.set(result);
- limiter.release();
+ releaseOperation();
}
@Override
- public void onFailure(Throwable t) {
+ public void onFailure(final Throwable t) {
proxyFuture.setException(t);
- limiter.release();
+ releaseOperation();
}
});
}
public void dataExists(YangInstanceIdentifier path, final SettableFuture<Boolean> proxyFuture) {
Futures.addCallback(getReadDelegate().exists(path), new FutureCallback<Boolean>() {
@Override
- public void onSuccess(Boolean result) {
+ public void onSuccess(final Boolean result) {
proxyFuture.set(result);
- limiter.release();
+ releaseOperation();
}
@Override
- public void onFailure(Throwable t) {
+ public void onFailure(final Throwable t) {
proxyFuture.setException(t);
- limiter.release();
+ releaseOperation();
}
});
}
private LocalThreePhaseCommitCohort ready() {
logModificationCount();
- LocalThreePhaseCommitCohort ready = (LocalThreePhaseCommitCohort) getWriteDelegate().ready();
- limiter.release();
- return ready;
+ acquireOperation();
+ return (LocalThreePhaseCommitCohort) getWriteDelegate().ready();
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private <T extends Future> T completeOperation(final ActorContext actorContext, final T operationFuture) {
+ operationFuture.onComplete(getLimiter(), actorContext.getClientDispatcher());
+ return operationFuture;
}
@Override
public Future<ActorSelection> readyTransaction() {
- return ready().initiateCoordinatedCommit();
+ final LocalThreePhaseCommitCohort cohort = ready();
+ return completeOperation(cohort.getActorContext(), cohort.initiateCoordinatedCommit());
}
@Override
public Future<Object> directCommit() {
- return ready().initiateDirectCommit();
+ final LocalThreePhaseCommitCohort cohort = ready();
+ return completeOperation(cohort.getActorContext(), cohort.initiateDirectCommit());
}
@Override
@Override
public void closeTransaction() {
txDelegate.close();
+ releaseOperation();
}
}
import com.google.common.base.Optional;
import com.google.common.util.concurrent.SettableFuture;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
private static final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
private final Throwable failure;
- private final OperationLimiter operationLimiter;
- public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier, OperationLimiter operationLimiter) {
- super(identifier);
+ public NoOpTransactionContext(Throwable failure, OperationLimiter limiter) {
+ super(limiter);
this.failure = failure;
- this.operationLimiter = operationLimiter;
}
@Override
@Override
public Future<Object> directCommit() {
LOG.debug("Tx {} directCommit called, failure: {}", getIdentifier(), failure);
- operationLimiter.release();
+ releaseOperation();
return akka.dispatch.Futures.failed(failure);
}
@Override
public Future<ActorSelection> readyTransaction() {
LOG.debug("Tx {} readyTransaction called, failure: {}", getIdentifier(), failure);
- operationLimiter.release();
+ releaseOperation();
return akka.dispatch.Futures.failed(failure);
}
@Override
public void deleteData(YangInstanceIdentifier path) {
LOG.debug("Tx {} deleteData called path = {}", getIdentifier(), path);
- operationLimiter.release();
+ releaseOperation();
}
@Override
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} mergeData called path = {}", getIdentifier(), path);
- operationLimiter.release();
+ releaseOperation();
}
@Override
public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} writeData called path = {}", getIdentifier(), path);
- operationLimiter.release();
+ releaseOperation();
}
@Override
public void readData(final YangInstanceIdentifier path, SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
LOG.debug("Tx {} readData called path = {}", getIdentifier(), path);
- operationLimiter.release();
- Throwable t;
- if(failure instanceof NoShardLeaderException) {
+ releaseOperation();
+
+ final Throwable t;
+ if (failure instanceof NoShardLeaderException) {
t = new DataStoreUnavailableException(failure.getMessage(), failure);
} else {
t = failure;
@Override
public void dataExists(YangInstanceIdentifier path, SettableFuture<Boolean> proxyFuture) {
LOG.debug("Tx {} dataExists called path = {}", getIdentifier(), path);
- operationLimiter.release();
+ releaseOperation();
proxyFuture.setException(new ReadFailedException("Error checking exists for path " + path, failure));
}
}
import akka.dispatch.OnComplete;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.SettableFuture;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
private final boolean isTxActorLocal;
private final short remoteTransactionVersion;
- private final OperationLimiter operationCompleter;
private BatchedModifications batchedModifications;
private int totalBatchedModificationsSent;
- protected RemoteTransactionContext(ActorSelection actor, TransactionIdentifier identifier,
+ protected RemoteTransactionContext(ActorSelection actor,
ActorContext actorContext, boolean isTxActorLocal,
short remoteTransactionVersion, OperationLimiter limiter) {
- super(identifier);
+ super(limiter);
this.actor = actor;
this.actorContext = actorContext;
this.isTxActorLocal = isTxActorLocal;
this.remoteTransactionVersion = remoteTransactionVersion;
- this.operationCompleter = limiter;
}
private Future<Object> completeOperation(Future<Object> operationFuture){
- operationFuture.onComplete(this.operationCompleter, actorContext.getClientDispatcher());
+ operationFuture.onComplete(getLimiter(), actorContext.getClientDispatcher());
return operationFuture;
}
-
private ActorSelection getActor() {
return actor;
}
public void deleteData(YangInstanceIdentifier path) {
LOG.debug("Tx {} deleteData called path = {}", getIdentifier(), path);
+ acquireOperation();
batchModification(new DeleteModification(path));
}
public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} mergeData called path = {}", getIdentifier(), path);
+ acquireOperation();
batchModification(new MergeModification(path, data));
}
public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
LOG.debug("Tx {} writeData called path = {}", getIdentifier(), path);
+ acquireOperation();
batchModification(new WriteModification(path, data));
}
// Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
// public API contract.
+ acquireOperation();
sendBatchedModifications();
OnComplete<Object> onComplete = new OnComplete<Object>() {
// Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
// public API contract.
+ acquireOperation();
sendBatchedModifications();
OnComplete<Object> onComplete = new OnComplete<Object>() {
if(failure != null) {
LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure);
- localTransactionContext = new NoOpTransactionContext(failure, getIdentifier(), getOperationLimiter());
+ localTransactionContext = new NoOpTransactionContext(failure, getOperationLimiter());
} else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) {
localTransactionContext = createValidTransactionContext(
CreateTransactionReply.fromSerializable(response));
IllegalArgumentException exception = new IllegalArgumentException(String.format(
"Invalid reply type %s for CreateTransaction", response.getClass()));
- localTransactionContext = new NoOpTransactionContext(exception, getIdentifier(), getOperationLimiter());
+ localTransactionContext = new NoOpTransactionContext(exception, getOperationLimiter());
}
transactionContextAdapter.executePriorTransactionOperations(localTransactionContext);
final TransactionContext ret;
if (remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
- ret = new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
+ ret = new PreLithiumTransactionContextImpl(transactionPath, transactionActor,
getActorContext(), isTxActorLocal, remoteTransactionVersion, parent.getLimiter());
} else {
- ret = new RemoteTransactionContext(transactionActor, getIdentifier(), getActorContext(),
+ ret = new RemoteTransactionContext(transactionActor, getActorContext(),
isTxActorLocal, remoteTransactionVersion, parent.getLimiter());
}
boolean supportsDirectCommit();
Future<Object> directCommit();
+
+ /**
+ * Invoked by {@link TransactionContextWrapper} when it has finished handing
+ * off operations to this context. From this point on, the context is responsible
+ * for throttling operations.
+ *
+ * Implementations can rely on the wrapper calling this operation in a synchronized
+ * block, so they do not need to ensure visibility of this state transition themselves.
+ */
+ void operationHandoffComplete();
}
*/
package org.opendaylight.controller.cluster.datastore;
+import akka.actor.ActorSelection;
+import akka.dispatch.Futures;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collection;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
/**
* A helper class that wraps an eventual TransactionContext instance. Operations destined for the target
*/
private volatile TransactionContext transactionContext;
- private final TransactionIdentifier identifier;
+ private final OperationLimiter limiter;
- TransactionContextWrapper(final TransactionIdentifier identifier) {
- this.identifier = identifier;
+ TransactionContextWrapper(final OperationLimiter limiter) {
+ this.limiter = Preconditions.checkNotNull(limiter);
}
TransactionContext getTransactionContext() {
}
TransactionIdentifier getIdentifier() {
- return identifier;
+ return limiter.getIdentifier();
}
/**
if (invokeOperation) {
operation.invoke(transactionContext);
+ } else {
+ limiter.acquire();
}
}
// queued (eg a put operation from a client read Future callback that is notified
// synchronously).
Collection<TransactionOperation> operationsBatch = null;
- synchronized(queuedTxOperations) {
- if(queuedTxOperations.isEmpty()) {
+ synchronized (queuedTxOperations) {
+ if (queuedTxOperations.isEmpty()) {
// We're done invoking the TransactionOperations so we can now publish the
// TransactionContext.
+ localTransactionContext.operationHandoffComplete();
transactionContext = localTransactionContext;
break;
}
// Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
// A slight down-side is that we need to re-acquire the lock below but this should
// be negligible.
- for(TransactionOperation oper: operationsBatch) {
+ for (TransactionOperation oper : operationsBatch) {
oper.invoke(localTransactionContext);
}
}
}
+
+ Future<ActorSelection> readyTransaction() {
+ // avoid the creation of a promise and a TransactionOperation
+ if (transactionContext != null) {
+ return transactionContext.readyTransaction();
+ }
+
+ final Promise<ActorSelection> promise = Futures.promise();
+ enqueueTransactionOperation(new TransactionOperation() {
+ @Override
+ public void invoke(TransactionContext transactionContext) {
+ promise.completeWith(transactionContext.readyTransaction());
+ }
+ });
+
+ return promise.future();
+ }
}
LOG.debug("Tx {} exists {}", getIdentifier(), path);
- limiter.acquire();
-
final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
TransactionContextWrapper contextAdapter = getContextAdapter(path);
contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
if (YangInstanceIdentifier.EMPTY.equals(path)) {
return readAllData();
} else {
- limiter.acquire();
-
return singleShardRead(shardNameFromIdentifier(path), path);
}
}
LOG.debug("Tx {} delete {}", getIdentifier(), path);
- limiter.acquire();
-
TransactionContextWrapper contextAdapter = getContextAdapter(path);
contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
@Override
LOG.debug("Tx {} merge {}", getIdentifier(), path);
- limiter.acquire();
-
TransactionContextWrapper contextAdapter = getContextAdapter(path);
contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
@Override
LOG.debug("Tx {} write {}", getIdentifier(), path);
- limiter.acquire();
-
TransactionContextWrapper contextAdapter = getContextAdapter(path);
contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
@Override
private AbstractThreePhaseCommitCohort<?> createSingleCommitCohort(final String shardName,
final TransactionContextWrapper contextAdapter) {
- limiter.acquire();
LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), shardName);
private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort(
final Set<Entry<String, TransactionContextWrapper>> txContextAdapterEntries) {
- limiter.acquire();
final List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txContextAdapterEntries.size());
for (Entry<String, TransactionContextWrapper> e : txContextAdapterEntries) {
LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey());
- TransactionContextWrapper contextAdapter = e.getValue();
- final TransactionContext transactionContext = contextAdapter.getTransactionContext();
- Future<ActorSelection> future;
- if (transactionContext != null) {
- // avoid the creation of a promise and a TransactionOperation
- future = transactionContext.readyTransaction();
- } else {
- final Promise<ActorSelection> promise = akka.dispatch.Futures.promise();
- contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
- @Override
- public void invoke(TransactionContext transactionContext) {
- promise.completeWith(transactionContext.readyTransaction());
- }
- });
-
- future = promise.future();
- }
-
- cohortFutures.add(future);
+ cohortFutures.add(e.getValue().readyTransaction());
}
return new ThreePhaseCommitCohortProxy(txContextFactory.getActorContext(), cohortFutures, getIdentifier().toString());
import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.OperationLimiter;
import org.opendaylight.controller.cluster.datastore.RemoteTransactionContext;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.controller.cluster.datastore.messages.MergeData;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
private final String transactionPath;
- public PreLithiumTransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
+ public PreLithiumTransactionContextImpl(String transactionPath, ActorSelection actor,
ActorContext actorContext, boolean isTxActorLocal,
short remoteTransactionVersion, OperationLimiter limiter) {
- super(actor, identifier, actorContext, isTxActorLocal, remoteTransactionVersion, limiter);
+ super(actor, actorContext, isTxActorLocal, remoteTransactionVersion, limiter);
this.transactionPath = transactionPath;
}
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+import akka.dispatch.ExecutionContexts;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
LocalTransactionContext localTransactionContext;
@Before
- public void setUp(){
+ public void setUp() {
MockitoAnnotations.initMocks(this);
- localTransactionContext = new LocalTransactionContext(identifier, readWriteTransaction, limiter) {
+ localTransactionContext = new LocalTransactionContext(readWriteTransaction, limiter) {
@Override
protected DOMStoreWriteTransaction getWriteDelegate() {
return readWriteTransaction;
}
@Test
- public void testWrite(){
+ public void testWrite() {
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
localTransactionContext.writeData(yangInstanceIdentifier, normalizedNode);
}
@Test
- public void testMerge(){
+ public void testMerge() {
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
localTransactionContext.mergeData(yangInstanceIdentifier, normalizedNode);
}
@Test
- public void testDelete(){
+ public void testDelete() {
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
localTransactionContext.deleteData(yangInstanceIdentifier);
verify(limiter).release();
@Test
- public void testRead(){
+ public void testRead() {
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
doReturn(Futures.immediateCheckedFuture(Optional.of(normalizedNode))).when(readWriteTransaction).read(yangInstanceIdentifier);
}
@Test
- public void testExists(){
+ public void testExists() {
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
doReturn(Futures.immediateCheckedFuture(true)).when(readWriteTransaction).exists(yangInstanceIdentifier);
localTransactionContext.dataExists(yangInstanceIdentifier, SettableFuture.<Boolean> create());
}
@Test
- public void testReady(){
- doReturn(mock(LocalThreePhaseCommitCohort.class)).when(readWriteTransaction).ready();
+ public void testReady() {
+ final LocalThreePhaseCommitCohort mockCohort = mock(LocalThreePhaseCommitCohort.class);
+ final ActorContext mockContext = mock(ActorContext.class);
+ doReturn(mockContext).when(mockCohort).getActorContext();
+ doReturn(ExecutionContexts.fromExecutor(MoreExecutors.directExecutor())).when(mockContext).getClientDispatcher();
+ doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit();
+ doReturn(mockCohort).when(readWriteTransaction).ready();
localTransactionContext.readyTransaction();
- verify(limiter).release();
+ verify(limiter).onComplete(null, null);
verify(readWriteTransaction).ready();
}