X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxyTest.java;h=844feb2f47e988b89498a80946d97aecbff8c049;hb=9c34ce103df5efac991297dc25a64c9b8d6019f3;hp=29a5b09c5c8e33fcb8892054564d4264dd2a711d;hpb=4e594bb578f687fb24e242e90b918d91fb59c3ac;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index 29a5b09c5c..844feb2f47 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -3,7 +3,6 @@ package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; @@ -41,8 +40,8 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedEx import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; -import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; +import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; @@ -61,10 +60,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import scala.concurrent.Await; -import scala.concurrent.Future; import scala.concurrent.Promise; -import scala.concurrent.duration.Duration; @SuppressWarnings("resource") public class TransactionProxyTest extends AbstractTransactionProxyTest { @@ -134,7 +130,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { if (exToThrow instanceof PrimaryNotFoundException) { doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString()); } else { - doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))). + doReturn(primaryShardInfoReply(getSystem(), actorRef)). when(mockActorContext).findPrimaryShardAsync(anyString()); } @@ -213,7 +209,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext). actorSelection(actorRef.path().toString()); - doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))). + doReturn(primaryShardInfoReply(getSystem(), actorRef)). when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync( @@ -313,29 +309,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { transactionProxy.exists(TestModel.TEST_PATH); } - private void verifyRecordingOperationFutures(List> futures, - Class... expResultTypes) throws Exception { - assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size()); - - int i = 0; - for( Future future: futures) { - assertNotNull("Recording operation Future is null", future); - - Class expResultType = expResultTypes[i++]; - if(Throwable.class.isAssignableFrom(expResultType)) { - try { - Await.result(future, Duration.create(5, TimeUnit.SECONDS)); - fail("Expected exception from recording operation Future"); - } catch(Exception e) { - // Expected - } - } else { - assertEquals(String.format("Recording operation %d Future result type", i +1 ), expResultType, - Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass()); - } - } - } - @Test public void testWrite() throws Exception { dataStoreContextBuilder.shardBatchedModificationCount(1); @@ -354,7 +327,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testWriteAfterAsyncRead() throws Throwable { - ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem()); + ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem(), DefaultShardStrategy.DEFAULT_SHARD); Promise createTxPromise = akka.dispatch.Futures.promise(); doReturn(createTxPromise).when(mockActorContext).executeOperationAsync( @@ -364,8 +337,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqSerializedReadData()); - expectBatchedModifications(actorRef, 1); - expectReadyTransaction(actorRef); + expectBatchedModificationsReady(actorRef); final NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); @@ -404,10 +376,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { // This sends the batched modification. transactionProxy.ready(); - verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false); - - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - BatchedModificationsReply.class); + verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true); } @Test(expected=IllegalStateException.class) @@ -456,7 +425,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { } @Test - public void testReadyWithReadWrite() throws Exception { + public void testReadWrite() throws Exception { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); @@ -465,7 +434,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { eq(actorSelection(actorRef)), eqSerializedReadData()); expectBatchedModifications(actorRef, 1); - expectReadyTransaction(actorRef); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); @@ -473,66 +441,105 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); - DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - - assertTrue(ready instanceof ThreePhaseCommitCohortProxy); - - ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; - - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - BatchedModificationsReply.class); + transactionProxy.read(TestModel.TEST_PATH); - verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + transactionProxy.read(TestModel.TEST_PATH); - verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)), - isA(BatchedModifications.class)); + List batchedModifications = captureBatchedModifications(actorRef); + assertEquals("Captured BatchedModifications count", 1, batchedModifications.size()); - verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)), - isA(ReadyTransaction.SERIALIZABLE_CLASS)); + verifyBatchedModifications(batchedModifications.get(0), false, + new WriteModification(TestModel.TEST_PATH, nodeToWrite)); } @Test - public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception { - dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); - - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); + public void testReadyWithReadWrite() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - expectBatchedModificationsReady(actorRef, 1); + doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData()); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); + expectBatchedModificationsReady(actorRef, true); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); + + transactionProxy.read(TestModel.TEST_PATH); transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + assertTrue(ready instanceof SingleCommitCohortProxy); + + verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable()); + + List batchedModifications = captureBatchedModifications(actorRef); + assertEquals("Captured BatchedModifications count", 1, batchedModifications.size()); + + verifyBatchedModifications(batchedModifications.get(0), true, true, + new WriteModification(TestModel.TEST_PATH, nodeToWrite)); + + assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent()); + } + + @Test + public void testReadyWithNoModifications() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); + + doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData()); + + expectBatchedModificationsReady(actorRef, true); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); + + transactionProxy.read(TestModel.TEST_PATH); - ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures()); + assertTrue(ready instanceof SingleCommitCohortProxy); - verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable()); List batchedModifications = captureBatchedModifications(actorRef); assertEquals("Captured BatchedModifications count", 1, batchedModifications.size()); - verifyBatchedModifications(batchedModifications.get(0), true, - new WriteModification(TestModel.TEST_PATH, nodeToWrite)); + verifyBatchedModifications(batchedModifications.get(0), true, true); + } - verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)), - isA(ReadyTransaction.SERIALIZABLE_CLASS)); + @Test + public void testReadyWithMultipleShardWrites() throws Exception { + ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); + + ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk"); + + expectBatchedModificationsReady(actorRef1); + expectBatchedModificationsReady(actorRef2); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); + + transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME)); + transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1), + actorSelection(actorRef2)); } @Test - public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception { - dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true); + public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception { + dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - expectBatchedModificationsReady(actorRef, 1); + expectBatchedModificationsReady(actorRef, true); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -540,38 +547,28 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + assertTrue(ready instanceof SingleCommitCohortProxy); - ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; - - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - BatchedModificationsReply.class); - - verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable()); List batchedModifications = captureBatchedModifications(actorRef); - assertEquals("Captured BatchedModifications count", 2, batchedModifications.size()); + assertEquals("Captured BatchedModifications count", 1, batchedModifications.size()); - verifyBatchedModifications(batchedModifications.get(0), false, + verifyBatchedModifications(batchedModifications.get(0), true, true, new WriteModification(TestModel.TEST_PATH, nodeToWrite)); - verifyBatchedModifications(batchedModifications.get(1), true); - verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); } @Test - public void testReadyWithRecordingOperationFailure() throws Exception { + public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception { dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true); - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - expectFailedBatchedModifications(actorRef); - - doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString()); + expectBatchedModificationsReady(actorRef, true); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); @@ -579,13 +576,20 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + assertTrue(ready instanceof SingleCommitCohortProxy); + + verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable()); - ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + List batchedModifications = captureBatchedModifications(actorRef); + assertEquals("Captured BatchedModifications count", 2, batchedModifications.size()); + + verifyBatchedModifications(batchedModifications.get(0), false, + new WriteModification(TestModel.TEST_PATH, nodeToWrite)); - verifyCohortFutures(proxy, TestException.class); + verifyBatchedModifications(batchedModifications.get(1), true, true); - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), TestException.class); + verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)), + isA(ReadyTransaction.SERIALIZABLE_CLASS)); } @Test @@ -604,11 +608,9 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - assertTrue(ready instanceof ThreePhaseCommitCohortProxy); - - ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + assertTrue(ready instanceof SingleCommitCohortProxy); - verifyCohortFutures(proxy, TestException.class); + verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class); } private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception { @@ -626,11 +628,9 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - assertTrue(ready instanceof ThreePhaseCommitCohortProxy); - - ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + assertTrue(ready instanceof SingleCommitCohortProxy); - verifyCohortFutures(proxy, toThrow.getClass()); + verifyCohortFutures((SingleCommitCohortProxy)ready, toThrow.getClass()); } @Test @@ -651,27 +651,26 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testReadyWithInvalidReplyMessageType() throws Exception { dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); + ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); - NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - - //expectBatchedModifications(actorRef, 1); + ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY, "junk"); doReturn(Futures.successful(new Object())).when(mockActorContext). - executeOperationAsync(eq(actorSelection(actorRef)), - isA(BatchedModifications.class)); + executeOperationAsync(eq(actorSelection(actorRef1)), isA(BatchedModifications.class)); + + expectBatchedModificationsReady(actorRef2); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); - transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME)); + transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); assertTrue(ready instanceof ThreePhaseCommitCohortProxy); - ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; - - verifyCohortFutures(proxy, IllegalArgumentException.class); + verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef2), + IllegalArgumentException.class); } @Test @@ -757,25 +756,18 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); doReturn(true).when(mockActorContext).isPathLocal(anyString()); - doReturn(batchedModificationsReply(1)).when(mockActorContext).executeOperationAsync( - any(ActorSelection.class), isA(BatchedModifications.class)); + expectBatchedModificationsReady(actorRef, true); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); - // testing ready - doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( - eq(actorSelection(actorRef)), isA(ReadyTransaction.class)); - DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - assertTrue(ready instanceof ThreePhaseCommitCohortProxy); - - ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + assertTrue(ready instanceof SingleCommitCohortProxy); - verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable()); } private static interface TransactionProxyOperation { @@ -842,7 +834,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { when(mockActorContext).actorSelection(shardActorRef.path().toString()); if(shardFound) { - doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))). + doReturn(primaryShardInfoReply(actorSystem, shardActorRef)). when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); } else { doReturn(Futures.failed(new PrimaryNotFoundException("test"))) @@ -1196,8 +1188,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { expectBatchedModifications(actorRef, shardBatchedModificationCount); - expectReadyTransaction(actorRef); - YangInstanceIdentifier writePath1 = TestModel.TEST_PATH; NormalizedNode writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME); @@ -1242,17 +1232,10 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1), new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3)); - boolean optimizedWriteOnly = type == WRITE_ONLY && dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled(); - verifyBatchedModifications(batchedModifications.get(2), optimizedWriteOnly, new MergeModification(mergePath3, mergeNode3), - new DeleteModification(deletePath2)); + verifyBatchedModifications(batchedModifications.get(2), true, true, + new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2)); - if(optimizedWriteOnly) { - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - BatchedModificationsReply.class, BatchedModificationsReply.class); - } else { - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class); - } + assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent()); } @Test @@ -1357,9 +1340,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { inOrder.verify(mockActorContext).executeOperationAsync( eq(actorSelection(actorRef)), eqSerializedDataExists()); - - verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), - BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class); } @Test @@ -1419,7 +1399,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(getSystem().actorSelection(shardActorRef.path())). when(mockActorContext).actorSelection(shardActorRef.path().toString()); - doReturn(Futures.successful(getSystem().actorSelection(shardActorRef.path()))). + doReturn(primaryShardInfoReply(getSystem(), shardActorRef)). when(mockActorContext).findPrimaryShardAsync(eq(shardName)); doReturn(true).when(mockActorContext).isPathLocal(shardActorRef.path().toString());