protected abstract <T> void onTransactionReady(@Nonnull TransactionIdentifier transaction, @Nonnull Collection<Future<T>> cohortFutures);
private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory, final TransactionProxy parent) {
- return new LocalTransactionContext(parent.getIdentifier(), factory.newReadWriteTransaction(parent.getIdentifier()));
+ return new LocalTransactionContext(parent.getIdentifier(), factory.newReadWriteTransaction(parent.getIdentifier()), parent.getCompleter());
}
}
import akka.actor.ActorSelection;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
*/
final class LocalTransactionContext extends AbstractTransactionContext {
private final DOMStoreReadWriteTransaction delegate;
+ private final OperationCompleter completer;
- LocalTransactionContext(TransactionIdentifier identifier, DOMStoreReadWriteTransaction delegate) {
+ LocalTransactionContext(TransactionIdentifier identifier, DOMStoreReadWriteTransaction delegate, OperationCompleter completer) {
super(identifier);
- this.delegate = delegate;
+ this.delegate = Preconditions.checkNotNull(delegate);
+ this.completer = Preconditions.checkNotNull(completer);
}
@Override
- public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ public void writeData(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
delegate.write(path, data);
+ completer.onComplete(null, null);
}
@Override
- public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ public void mergeData(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
delegate.merge(path, data);
+ completer.onComplete(null, null);
}
@Override
- public void deleteData(YangInstanceIdentifier path) {
+ public void deleteData(final YangInstanceIdentifier path) {
delegate.delete(path);
+ completer.onComplete(null, null);
}
@Override
- public void readData(YangInstanceIdentifier path, final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
+ public void readData(final YangInstanceIdentifier path, final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
+
Futures.addCallback(delegate.read(path), new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
@Override
public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
proxyFuture.set(result);
+ completer.onComplete(null, null);
}
@Override
public void onFailure(Throwable t) {
proxyFuture.setException(t);
+ completer.onComplete(null, null);
}
});
}
@Override
- public void dataExists(YangInstanceIdentifier path, final SettableFuture<Boolean> proxyFuture) {
+ public void dataExists(final YangInstanceIdentifier path, final SettableFuture<Boolean> proxyFuture) {
Futures.addCallback(delegate.exists(path), new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
proxyFuture.set(result);
+ completer.onComplete(null, null);
}
@Override
public void onFailure(Throwable t) {
proxyFuture.setException(t);
+ completer.onComplete(null, null);
}
});
}
private LocalThreePhaseCommitCohort ready() {
- return (LocalThreePhaseCommitCohort) delegate.ready();
+ LocalThreePhaseCommitCohort ready = (LocalThreePhaseCommitCohort) delegate.ready();
+ completer.onComplete(null, null);
+ return ready;
}
@Override
--- /dev/null
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.concurrent.Semaphore;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+public class LocalTransactionContextTest {
+
+ @Mock
+ Semaphore limiter;
+
+ @Mock
+ TransactionIdentifier identifier;
+
+ @Mock
+ DOMStoreReadWriteTransaction readWriteTransaction;
+
+ LocalTransactionContext localTransactionContext;
+
+ @Before
+ public void setUp(){
+ MockitoAnnotations.initMocks(this);
+ localTransactionContext = new LocalTransactionContext(identifier, readWriteTransaction, new OperationCompleter(limiter));
+ }
+
+ @Test
+ public void testWrite(){
+ YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
+ NormalizedNode normalizedNode = mock(NormalizedNode.class);
+ localTransactionContext.writeData(yangInstanceIdentifier, normalizedNode);
+ verify(limiter).release();
+ verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
+ }
+
+ @Test
+ public void testMerge(){
+ YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
+ NormalizedNode normalizedNode = mock(NormalizedNode.class);
+ localTransactionContext.mergeData(yangInstanceIdentifier, normalizedNode);
+ verify(limiter).release();
+ verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
+ }
+
+ @Test
+ public void testDelete(){
+ YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
+ localTransactionContext.deleteData(yangInstanceIdentifier);
+ verify(limiter).release();
+ verify(readWriteTransaction).delete(yangInstanceIdentifier);
+ }
+
+
+ @Test
+ public void testRead(){
+ YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
+ NormalizedNode normalizedNode = mock(NormalizedNode.class);
+ doReturn(Futures.immediateCheckedFuture(Optional.of(normalizedNode))).when(readWriteTransaction).read(yangInstanceIdentifier);
+ localTransactionContext.readData(yangInstanceIdentifier, SettableFuture.<Optional<NormalizedNode<?,?>>>create());
+ verify(limiter).release();
+ verify(readWriteTransaction).read(yangInstanceIdentifier);
+ }
+
+ @Test
+ public void testExists(){
+ YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
+ doReturn(Futures.immediateCheckedFuture(true)).when(readWriteTransaction).exists(yangInstanceIdentifier);
+ localTransactionContext.dataExists(yangInstanceIdentifier, SettableFuture.<Boolean> create());
+ verify(limiter).release();
+ verify(readWriteTransaction).exists(yangInstanceIdentifier);
+ }
+
+ @Test
+ public void testReady(){
+ doReturn(mock(LocalThreePhaseCommitCohort.class)).when(readWriteTransaction).ready();
+ localTransactionContext.readyTransaction();
+ verify(limiter).release();
+ verify(readWriteTransaction).ready();
+ }
+
+
+}
\ No newline at end of file
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.Promise;
return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), Optional.<DataTree>absent());
}
+ private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef, Optional<DataTree> dataTreeOptional){
+ return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), dataTreeOptional);
+ }
+
+
private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){
ActorSystem actorSystem = getSystem();
ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
expected, (end-start)), (end - start) <= expected);
}
+ private void completeOperationLocal(TransactionProxyOperation operation, Optional<DataTree> dataTreeOptional){
+ ActorSystem actorSystem = getSystem();
+ ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+
+ doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit();
+
+ doReturn(actorSystem.actorSelection(shardActorRef.path())).
+ when(mockActorContext).actorSelection(shardActorRef.path().toString());
+
+ doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, dataTreeOptional))).
+ when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
+
+ long start = System.nanoTime();
+
+ operation.run(transactionProxy);
+
+ long end = System.nanoTime();
+
+ long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds());
+ Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
+ expected, (end-start)), (end - start) <= expected);
+ }
+
+ private Optional<DataTree> createDataTree(){
+ DataTree dataTree = mock(DataTree.class);
+ Optional<DataTree> dataTreeOptional = Optional.of(dataTree);
+ DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
+ DataTreeModification dataTreeModification = mock(DataTreeModification.class);
+
+ doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
+ doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
+
+ return dataTreeOptional;
+ }
+
+ private Optional<DataTree> createDataTree(NormalizedNode readResponse){
+ DataTree dataTree = mock(DataTree.class);
+ Optional<DataTree> dataTreeOptional = Optional.of(dataTree);
+ DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
+ DataTreeModification dataTreeModification = mock(DataTreeModification.class);
+
+ doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
+ doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
+ doReturn(Optional.of(readResponse)).when(dataTreeModification).readNode(any(YangInstanceIdentifier.class));
+
+ return dataTreeOptional;
+ }
+
+
+ @Test
+ public void testWriteCompletionForLocalShard(){
+ dataStoreContextBuilder.shardBatchedModificationCount(1);
+ completeOperationLocal(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ }
+ }, createDataTree());
+ }
+
@Test
public void testWriteThrottlingWhenShardFound(){
dataStoreContextBuilder.shardBatchedModificationCount(1);
}
+ @Test
+ public void testMergeCompletionForLocalShard(){
+ dataStoreContextBuilder.shardBatchedModificationCount(1);
+ completeOperationLocal(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
+
+ transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
+
+ }
+ }, createDataTree());
+ }
+
+
@Test
public void testDeleteThrottlingWhenShardFound(){
}, false);
}
+ @Test
+ public void testDeleteCompletionForLocalShard(){
+ dataStoreContextBuilder.shardBatchedModificationCount(1);
+ completeOperationLocal(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+
+ transactionProxy.delete(TestModel.TEST_PATH);
+
+ transactionProxy.delete(TestModel.TEST_PATH);
+ }
+ }, createDataTree());
+
+ }
+
@Test
public void testDeleteCompletion(){
dataStoreContextBuilder.shardBatchedModificationCount(1);
}
+ @Test
+ public void testReadCompletionForLocalShard(){
+ final NormalizedNode nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ completeOperationLocal(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ transactionProxy.read(TestModel.TEST_PATH);
+
+ transactionProxy.read(TestModel.TEST_PATH);
+ }
+ }, createDataTree(nodeToRead));
+
+ }
+
+ @Test
+ public void testReadCompletionForLocalShardWhenExceptionOccurs(){
+ completeOperationLocal(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ transactionProxy.read(TestModel.TEST_PATH);
+
+ transactionProxy.read(TestModel.TEST_PATH);
+ }
+ }, createDataTree());
+
+ }
+
@Test
public void testExistsThrottlingWhenShardFound(){
}
+ @Test
+ public void testExistsCompletionForLocalShard(){
+ final NormalizedNode nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ completeOperationLocal(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ transactionProxy.exists(TestModel.TEST_PATH);
+
+ transactionProxy.exists(TestModel.TEST_PATH);
+ }
+ }, createDataTree(nodeToRead));
+
+ }
+
+ @Test
+ public void testExistsCompletionForLocalShardWhenExceptionOccurs(){
+ completeOperationLocal(new TransactionProxyOperation() {
+ @Override
+ public void run(TransactionProxy transactionProxy) {
+ transactionProxy.exists(TestModel.TEST_PATH);
+
+ transactionProxy.exists(TestModel.TEST_PATH);
+ }
+ }, createDataTree());
+
+ }
@Test
public void testReadyThrottling(){