X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxyTest.java;h=dd55441c2a22a014d725674379c071c990c051cf;hb=refs%2Fchanges%2F89%2F19089%2F8;hp=cc9692bfd91b72f3245f2bb6bd160f12551720b5;hpb=9f1061c46af5220ad95d8d0b94411ba2fd832a50;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index cc9692bfd9..dd55441c2a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -11,9 +11,9 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; -import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY; -import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE; -import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY; +import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_ONLY; +import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE; +import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; @@ -34,13 +34,13 @@ import org.junit.Assert; import org.junit.Test; import org.mockito.InOrder; import org.mockito.Mockito; -import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; +import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; @@ -129,7 +129,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { if (exToThrow instanceof PrimaryNotFoundException) { doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString()); } else { - doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))). + doReturn(primaryShardInfoReply(getSystem(), actorRef)). when(mockActorContext).findPrimaryShardAsync(anyString()); } @@ -208,7 +208,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext). actorSelection(actorRef.path().toString()); - doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))). + doReturn(primaryShardInfoReply(getSystem(), actorRef)). when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync( @@ -326,7 +326,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testWriteAfterAsyncRead() throws Throwable { - ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem()); + ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem(), DefaultShardStrategy.DEFAULT_SHARD); Promise createTxPromise = akka.dispatch.Futures.promise(); doReturn(createTxPromise).when(mockActorContext).executeOperationAsync( @@ -460,7 +460,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqSerializedReadData()); - expectBatchedModificationsReady(actorRef); + expectBatchedModificationsReady(actorRef, true); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); @@ -470,16 +470,14 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - assertTrue(ready instanceof ThreePhaseCommitCohortProxy); - - ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + assertTrue(ready instanceof SingleCommitCohortProxy); - verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable()); List batchedModifications = captureBatchedModifications(actorRef); assertEquals("Captured BatchedModifications count", 1, batchedModifications.size()); - verifyBatchedModifications(batchedModifications.get(0), true, + verifyBatchedModifications(batchedModifications.get(0), true, true, new WriteModification(TestModel.TEST_PATH, nodeToWrite)); assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent()); @@ -492,7 +490,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqSerializedReadData()); - expectBatchedModificationsReady(actorRef); + expectBatchedModificationsReady(actorRef, true); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); @@ -500,16 +498,36 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - assertTrue(ready instanceof ThreePhaseCommitCohortProxy); - - ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + assertTrue(ready instanceof SingleCommitCohortProxy); - verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable()); List batchedModifications = captureBatchedModifications(actorRef); assertEquals("Captured BatchedModifications count", 1, batchedModifications.size()); - verifyBatchedModifications(batchedModifications.get(0), true); + verifyBatchedModifications(batchedModifications.get(0), true, true); + } + + @Test + public void testReadyWithMultipleShardWrites() throws Exception { + ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); + + ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk"); + + expectBatchedModificationsReady(actorRef1); + expectBatchedModificationsReady(actorRef2); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); + + transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME)); + transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1), + actorSelection(actorRef2)); } @Test @@ -520,7 +538,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - expectBatchedModificationsReady(actorRef); + expectBatchedModificationsReady(actorRef, true); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -528,16 +546,14 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - assertTrue(ready instanceof ThreePhaseCommitCohortProxy); - - ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + assertTrue(ready instanceof SingleCommitCohortProxy); - verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable()); List batchedModifications = captureBatchedModifications(actorRef); assertEquals("Captured BatchedModifications count", 1, batchedModifications.size()); - verifyBatchedModifications(batchedModifications.get(0), true, + verifyBatchedModifications(batchedModifications.get(0), true, true, new WriteModification(TestModel.TEST_PATH, nodeToWrite)); verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)), @@ -551,7 +567,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - expectBatchedModificationsReady(actorRef); + expectBatchedModificationsReady(actorRef, true); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -559,11 +575,9 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - assertTrue(ready instanceof ThreePhaseCommitCohortProxy); - - ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + assertTrue(ready instanceof SingleCommitCohortProxy); - verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable()); List batchedModifications = captureBatchedModifications(actorRef); assertEquals("Captured BatchedModifications count", 2, batchedModifications.size()); @@ -571,7 +585,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(TestModel.TEST_PATH, nodeToWrite)); - verifyBatchedModifications(batchedModifications.get(1), true); + verifyBatchedModifications(batchedModifications.get(1), true, true); verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); @@ -593,11 +607,9 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - assertTrue(ready instanceof ThreePhaseCommitCohortProxy); - - ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + assertTrue(ready instanceof SingleCommitCohortProxy); - verifyCohortFutures(proxy, TestException.class); + verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class); } private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception { @@ -615,11 +627,9 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + assertTrue(ready instanceof SingleCommitCohortProxy); - ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; - - verifyCohortFutures(proxy, toThrow.getClass()); + verifyCohortFutures((SingleCommitCohortProxy)ready, toThrow.getClass()); } @Test @@ -640,34 +650,33 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testReadyWithInvalidReplyMessageType() throws Exception { dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); + ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); - NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - - //expectBatchedModifications(actorRef, 1); + ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk"); doReturn(Futures.successful(new Object())).when(mockActorContext). - executeOperationAsync(eq(actorSelection(actorRef)), - isA(BatchedModifications.class)); + executeOperationAsync(eq(actorSelection(actorRef1)), isA(BatchedModifications.class)); + + expectBatchedModificationsReady(actorRef2); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); - transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME)); + transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); assertTrue(ready instanceof ThreePhaseCommitCohortProxy); - ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; - - verifyCohortFutures(proxy, IllegalArgumentException.class); + verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef2), + IllegalArgumentException.class); } @Test public void testGetIdentifier() { setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - TransactionProxy.TransactionType.READ_ONLY); + TransactionType.READ_ONLY); Object id = transactionProxy.getIdentifier(); assertNotNull("getIdentifier returned null", id); @@ -746,7 +755,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); doReturn(true).when(mockActorContext).isPathLocal(anyString()); - expectBatchedModificationsReady(actorRef); + expectBatchedModificationsReady(actorRef, true); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); @@ -755,11 +764,9 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + assertTrue(ready instanceof SingleCommitCohortProxy); - ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; - - verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable()); } private static interface TransactionProxyOperation { @@ -826,7 +833,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { when(mockActorContext).actorSelection(shardActorRef.path().toString()); if(shardFound) { - doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))). + doReturn(primaryShardInfoReply(actorSystem, shardActorRef)). when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); } else { doReturn(Futures.failed(new PrimaryNotFoundException("test"))) @@ -1224,8 +1231,8 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1), new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3)); - verifyBatchedModifications(batchedModifications.get(2), true, new MergeModification(mergePath3, mergeNode3), - new DeleteModification(deletePath2)); + verifyBatchedModifications(batchedModifications.get(2), true, true, + new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2)); assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent()); } @@ -1391,7 +1398,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(getSystem().actorSelection(shardActorRef.path())). when(mockActorContext).actorSelection(shardActorRef.path().toString()); - doReturn(Futures.successful(getSystem().actorSelection(shardActorRef.path()))). + doReturn(primaryShardInfoReply(getSystem(), shardActorRef)). when(mockActorContext).findPrimaryShardAsync(eq(shardName)); doReturn(true).when(mockActorContext).isPathLocal(shardActorRef.path().toString());