X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxyTest.java;h=fa2f9187d6059f1585a1475531556d8563db3c5a;hb=32b25203819eb02df22abfecdcc86896c068f778;hp=79edd19bba3328034ea313baa28333ba398226af;hpb=3c298061e7c22390eaf790aae977decfe0f94ad1;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index 79edd19bba..fa2f9187d6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -25,11 +25,15 @@ import akka.testkit.JavaTestKit; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import java.io.IOException; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -72,6 +76,7 @@ import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; +import scala.concurrent.Promise; import scala.concurrent.duration.Duration; @SuppressWarnings("resource") @@ -124,6 +129,7 @@ public class TransactionProxyTest { DatastoreContext dataStoreContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(2).build(); doReturn(getSystem()).when(mockActorContext).getActorSystem(); + doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher(); doReturn(memberName).when(mockActorContext).getCurrentMemberName(); doReturn(schemaContext).when(mockActorContext).getSchemaContext(); doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper(); @@ -370,8 +376,12 @@ public class TransactionProxyTest { return Futures.successful(new MergeDataReply()); } + private Future deleteSerializedDataReply(short version) { + return Futures.successful(new DeleteDataReply().toSerializable(version)); + } + private Future deleteSerializedDataReply() { - return Futures.successful(new DeleteDataReply().toSerializable()); + return deleteSerializedDataReply(DataStoreVersions.CURRENT_VERSION); } private Future deleteDataReply() { @@ -390,8 +400,7 @@ public class TransactionProxyTest { .build(); } - private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, - TransactionType type, int transactionVersion) { + private ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem) { ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); doReturn(actorSystem.actorSelection(actorRef.path())). when(mockActorContext).actorSelection(actorRef.path().toString()); @@ -399,10 +408,6 @@ public class TransactionProxyTest { doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))). when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); - doReturn(Futures.successful(createTransactionReply(actorRef, transactionVersion))).when(mockActorContext). - executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())), - eqCreateTransaction(memberName, type)); - doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString()); doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit(); @@ -410,6 +415,17 @@ public class TransactionProxyTest { return actorRef; } + private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, + TransactionType type, int transactionVersion) { + ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem); + + doReturn(Futures.successful(createTransactionReply(actorRef, transactionVersion))).when(mockActorContext). + executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())), + eqCreateTransaction(memberName, type)); + + return actorRef; + } + private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) { return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION); } @@ -768,6 +784,62 @@ public class TransactionProxyTest { WriteDataReply.class); } + @Test + public void testWriteAfterAsyncRead() throws Throwable { + ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem()); + + Promise createTxPromise = akka.dispatch.Futures.promise(); + doReturn(createTxPromise).when(mockActorContext).executeOperationAsync( + eq(getSystem().actorSelection(actorRef.path())), + eqCreateTransaction(memberName, READ_WRITE)); + + doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData()); + + final NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); + + final TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); + + final CountDownLatch readComplete = new CountDownLatch(1); + final AtomicReference caughtEx = new AtomicReference<>(); + com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH), + new FutureCallback>>() { + @Override + public void onSuccess(Optional> result) { + try { + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + } catch (Exception e) { + caughtEx.set(e); + } finally { + readComplete.countDown(); + } + } + + @Override + public void onFailure(Throwable t) { + caughtEx.set(t); + readComplete.countDown(); + } + }); + + createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION)); + + Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS); + + if(caughtEx.get() != null) { + throw caughtEx.get(); + } + + verify(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + WriteDataReply.class); + } + @Test(expected=IllegalStateException.class) public void testWritePreConditionCheck() { @@ -826,7 +898,7 @@ public class TransactionProxyTest { eq(actorSelection(actorRef)), eqSerializedDeleteData()); verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - DeleteDataReply.SERIALIZABLE_CLASS); + DeleteDataReply.class); } private void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy, @@ -1274,15 +1346,15 @@ public class TransactionProxyTest { TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); - long start = System.currentTimeMillis(); + long start = System.nanoTime(); operation.run(transactionProxy); - long end = System.currentTimeMillis(); + long end = System.nanoTime(); Assert.assertTrue(String.format("took less time than expected %s was %s", - mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000, - (end-start)), (end - start) > mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000); + TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()), + (end-start)), (end - start) > TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds())); } @@ -1321,15 +1393,15 @@ public class TransactionProxyTest { TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); - long start = System.currentTimeMillis(); + long start = System.nanoTime(); operation.run(transactionProxy); - long end = System.currentTimeMillis(); + long end = System.nanoTime(); Assert.assertTrue(String.format("took more time than expected %s was %s", - mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000, - (end-start)), (end - start) <= mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()*1000); + TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()), + (end-start)), (end - start) <= TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds())); } public void testWriteThrottling(boolean shardFound){