X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxyTest.java;h=e5392e025158704f44d152306aaa727b64d460e8;hp=14696f786e7e36888be3b5517c14ae4b9779340f;hb=b10d77375b5a290143106180f1583ea4e18f8478;hpb=63b36aa3537d77bd9be323e1113716ef2cd54098 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index 14696f786e..e5392e0251 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -9,7 +9,9 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Props; import akka.dispatch.Futures; + import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; import org.junit.Before; import org.junit.Test; @@ -29,12 +31,15 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; import org.opendaylight.controller.cluster.datastore.messages.DeleteData; +import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply; import org.opendaylight.controller.cluster.datastore.messages.MergeData; +import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply; import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.WriteData; +import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply; import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; @@ -48,10 +53,10 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import scala.concurrent.Await; import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - -import java.util.Arrays; +import scala.concurrent.duration.Duration; +import java.util.List; import java.util.concurrent.TimeUnit; import static org.mockito.Matchers.any; @@ -62,6 +67,7 @@ import static org.mockito.Mockito.argThat; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.isA; +import static org.mockito.Mockito.times; @SuppressWarnings("resource") public class TransactionProxyTest extends AbstractActorTest { @@ -71,7 +77,7 @@ public class TransactionProxyTest extends AbstractActorTest { } static interface Invoker { - void invoke(TransactionProxy proxy) throws Exception; + CheckedFuture invoke(TransactionProxy proxy) throws Exception; } private final Configuration configuration = new MockConfiguration(); @@ -90,6 +96,8 @@ public class TransactionProxyTest extends AbstractActorTest { schemaContext = TestModel.createTestContext(); doReturn(getSystem()).when(mockActorContext).getActorSystem(); + doReturn(memberName).when(mockActorContext).getCurrentMemberName(); + doReturn(schemaContext).when(mockActorContext).getSchemaContext(); ShardStrategyFactory.setConfiguration(configuration); } @@ -112,8 +120,8 @@ public class TransactionProxyTest extends AbstractActorTest { ArgumentMatcher matcher = new ArgumentMatcher() { @Override public boolean matches(Object argument) { - DataExists obj = DataExists.fromSerializable(argument); - return obj.getPath().equals(TestModel.TEST_PATH); + return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) && + DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH); } }; @@ -124,8 +132,8 @@ public class TransactionProxyTest extends AbstractActorTest { ArgumentMatcher matcher = new ArgumentMatcher() { @Override public boolean matches(Object argument) { - ReadData obj = ReadData.fromSerializable(argument); - return obj.getPath().equals(TestModel.TEST_PATH); + return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) && + ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH); } }; @@ -136,6 +144,10 @@ public class TransactionProxyTest extends AbstractActorTest { ArgumentMatcher matcher = new ArgumentMatcher() { @Override public boolean matches(Object argument) { + if(!WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) { + return false; + } + WriteData obj = WriteData.fromSerializable(argument, schemaContext); return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite); @@ -149,6 +161,10 @@ public class TransactionProxyTest extends AbstractActorTest { ArgumentMatcher matcher = new ArgumentMatcher() { @Override public boolean matches(Object argument) { + if(!MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) { + return false; + } + MergeData obj = MergeData.fromSerializable(argument, schemaContext); return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite); @@ -162,33 +178,40 @@ public class TransactionProxyTest extends AbstractActorTest { ArgumentMatcher matcher = new ArgumentMatcher() { @Override public boolean matches(Object argument) { - DeleteData obj = DeleteData.fromSerializable(argument); - return obj.getPath().equals(TestModel.TEST_PATH); + return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) && + DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH); } }; return argThat(matcher); } - private Object readyTxReply(ActorPath path) { - return new ReadyTransactionReply(path).toSerializable(); + private Future readyTxReply(ActorPath path) { + return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable()); } private Future readDataReply(NormalizedNode data) { - return Futures.successful(new ReadDataReply(schemaContext, data) - .toSerializable()); + return Futures.successful(new ReadDataReply(schemaContext, data).toSerializable()); } private Future dataExistsReply(boolean exists) { return Futures.successful(new DataExistsReply(exists).toSerializable()); } - private ActorSelection actorSelection(ActorRef actorRef) { - return getSystem().actorSelection(actorRef.path()); + private Future writeDataReply() { + return Futures.successful(new WriteDataReply().toSerializable()); + } + + private Future mergeDataReply() { + return Futures.successful(new MergeDataReply().toSerializable()); + } + + private Future deleteDataReply() { + return Futures.successful(new DeleteDataReply().toSerializable()); } - private FiniteDuration anyDuration() { - return any(FiniteDuration.class); + private ActorSelection actorSelection(ActorRef actorRef) { + return getSystem().actorSelection(actorRef.path()); } private CreateTransactionReply createTransactionReply(ActorRef actorRef){ @@ -201,10 +224,9 @@ public class TransactionProxyTest extends AbstractActorTest { ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class)); doReturn(getSystem().actorSelection(actorRef.path())). when(mockActorContext).actorSelection(actorRef.path().toString()); - doReturn(memberName).when(mockActorContext).getCurrentMemberName(); doReturn(createTransactionReply(actorRef)).when(mockActorContext). executeShardOperation(eq(DefaultShardStrategy.DEFAULT_SHARD), - eqCreateTransaction(memberName, type), anyDuration()); + eqCreateTransaction(memberName, type)); doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath( anyString(), eq(actorRef.path().toString())); doReturn(actorRef.path()).when(mockActorContext).actorFor(actorRef.path().toString()); @@ -212,15 +234,26 @@ public class TransactionProxyTest extends AbstractActorTest { return actorRef; } + private void propagateReadFailedExceptionCause(CheckedFuture future) + throws Throwable { + + try { + future.checkedGet(5, TimeUnit.SECONDS); + fail("Expected ReadFailedException"); + } catch(ReadFailedException e) { + throw e.getCause(); + } + } + @Test public void testRead() throws Exception { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_ONLY, schemaContext); + READ_ONLY); doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqReadData(), anyDuration()); + eq(actorSelection(actorRef)), eqReadData()); Optional> readOptional = transactionProxy.read( TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); @@ -230,7 +263,7 @@ public class TransactionProxyTest extends AbstractActorTest { NormalizedNode expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqReadData(), anyDuration()); + eq(actorSelection(actorRef)), eqReadData()); readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); @@ -240,14 +273,14 @@ public class TransactionProxyTest extends AbstractActorTest { } @Test(expected = ReadFailedException.class) - public void testReadWhenAnInvalidMessageIsSentInReply() throws Exception { + public void testReadWithInvalidReplyMessageType() throws Exception { setupActorContextWithInitialCreateTransaction(READ_ONLY); doReturn(Futures.successful(new Object())).when(mockActorContext). - executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration()); + executeRemoteOperationAsync(any(ActorSelection.class), any()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_ONLY, schemaContext); + READ_ONLY); transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); } @@ -256,44 +289,32 @@ public class TransactionProxyTest extends AbstractActorTest { public void testReadWithAsyncRemoteOperatonFailure() throws Throwable { setupActorContextWithInitialCreateTransaction(READ_ONLY); - doThrow(new TestException()).when(mockActorContext). - executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration()); + doReturn(Futures.failed(new TestException())).when(mockActorContext). + executeRemoteOperationAsync(any(ActorSelection.class), any()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_ONLY, schemaContext); + READ_ONLY); - try { - transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); - fail("Expected ReadFailedException"); - } catch(ReadFailedException e) { - // Expected - throw cause - expects TestException. - throw e.getCause(); - } + propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH)); } private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker) throws Throwable { doThrow(exToThrow).when(mockActorContext).executeShardOperation( - anyString(), any(), anyDuration()); + anyString(), any()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_ONLY, schemaContext); + READ_ONLY); - try { - invoker.invoke(transactionProxy); - fail("Expected ReadFailedException"); - } catch(ReadFailedException e) { - // Expected - throw cause - expects TestException. - throw e.getCause(); - } + propagateReadFailedExceptionCause(invoker.invoke(transactionProxy)); } private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable { testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() { @Override - public void invoke(TransactionProxy proxy) throws Exception { - proxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); + public CheckedFuture invoke(TransactionProxy proxy) throws Exception { + return proxy.read(TestModel.TEST_PATH); } }); } @@ -314,22 +335,86 @@ public class TransactionProxyTest extends AbstractActorTest { testReadWithExceptionOnInitialCreateTransaction(new TestException()); } + @Test(expected = TestException.class) + public void testReadWithPriorRecordingOperationFailure() throws Throwable { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); + + doReturn(Futures.failed(new TestException())).when(mockActorContext). + executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData()); + + doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqReadData()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_WRITE); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.delete(TestModel.TEST_PATH); + + try { + propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH)); + } finally { + verify(mockActorContext, times(0)).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqReadData()); + } + } + + @Test + public void testReadWithPriorRecordingOperationSuccessful() throws Throwable { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE); + + NormalizedNode expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqWriteData(expectedNode)); + + doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqReadData()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_WRITE); + + transactionProxy.write(TestModel.TEST_PATH, expectedNode); + + Optional> readOptional = transactionProxy.read( + TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); + + assertEquals("NormalizedNode isPresent", true, readOptional.isPresent()); + + assertEquals("Response NormalizedNode", expectedNode, readOptional.get()); + } + + @Test(expected=IllegalStateException.class) + public void testReadPreConditionCheck() { + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); + + transactionProxy.read(TestModel.TEST_PATH); + } + @Test public void testExists() throws Exception { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_ONLY, schemaContext); + READ_ONLY); doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqDataExists(), anyDuration()); + eq(actorSelection(actorRef)), eqDataExists()); Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(); assertEquals("Exists response", false, exists); doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqDataExists(), anyDuration()); + eq(actorSelection(actorRef)), eqDataExists()); exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(); @@ -340,21 +425,21 @@ public class TransactionProxyTest extends AbstractActorTest { public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable { testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() { @Override - public void invoke(TransactionProxy proxy) throws Exception { - proxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); + public CheckedFuture invoke(TransactionProxy proxy) throws Exception { + return proxy.exists(TestModel.TEST_PATH); } }); } @Test(expected = ReadFailedException.class) - public void testExistsWhenAnInvalidMessageIsSentInReply() throws Exception { + public void testExistsWithInvalidReplyMessageType() throws Exception { setupActorContextWithInitialCreateTransaction(READ_ONLY); doReturn(Futures.successful(new Object())).when(mockActorContext). - executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration()); + executeRemoteOperationAsync(any(ActorSelection.class), any()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_ONLY, schemaContext); + READ_ONLY); transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); } @@ -363,62 +448,205 @@ public class TransactionProxyTest extends AbstractActorTest { public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable { setupActorContextWithInitialCreateTransaction(READ_ONLY); - doThrow(new TestException()).when(mockActorContext). - executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration()); + doReturn(Futures.failed(new TestException())).when(mockActorContext). + executeRemoteOperationAsync(any(ActorSelection.class), any()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY); + + propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH)); + } + + @Test(expected = TestException.class) + public void testExistsWithPriorRecordingOperationFailure() throws Throwable { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); + + doReturn(Futures.failed(new TestException())).when(mockActorContext). + executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData()); + + doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqDataExists()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_ONLY, schemaContext); + READ_WRITE); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.delete(TestModel.TEST_PATH); try { - transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); - fail("Expected ReadFailedException"); - } catch(ReadFailedException e) { - // Expected - throw cause - expects TestException. - throw e.getCause(); + propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH)); + } finally { + verify(mockActorContext, times(0)).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqDataExists()); } } @Test - public void testWrite() throws Exception { - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); + public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); + + doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqDataExists()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - WRITE_ONLY, schemaContext); + READ_WRITE); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(); + + assertEquals("Exists response", true, exists); + } + + @Test(expected=IllegalStateException.class) + public void testxistsPreConditionCheck() { + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); + + transactionProxy.exists(TestModel.TEST_PATH); + } + + private void verifyRecordingOperationFutures(List> futures, + Class... expResultTypes) throws Exception { + assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size()); + + int i = 0; + for( Future future: futures) { + assertNotNull("Recording operation Future is null", future); + + Class expResultType = expResultTypes[i++]; + if(Throwable.class.isAssignableFrom(expResultType)) { + try { + Await.result(future, Duration.create(5, TimeUnit.SECONDS)); + fail("Expected exception from recording operation Future"); + } catch(Exception e) { + // Expected + } + } else { + assertEquals("Recording operation Future result type", expResultType, + Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass()); + } + } + } + + @Test + public void testWrite() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); - verify(mockActorContext).sendRemoteOperationAsync( + verify(mockActorContext).executeRemoteOperationAsync( eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + WriteDataReply.SERIALIZABLE_CLASS); + } + + @Test(expected=IllegalStateException.class) + public void testWritePreConditionCheck() { + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY); + + transactionProxy.write(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + } + + @Test(expected=IllegalStateException.class) + public void testWriteAfterReadyPreConditionCheck() { + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); + + transactionProxy.ready(); + + transactionProxy.write(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); } @Test public void testMerge() throws Exception { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - WRITE_ONLY, schemaContext); - NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqMergeData(nodeToWrite)); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); + transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); - verify(mockActorContext).sendRemoteOperationAsync( + verify(mockActorContext).executeRemoteOperationAsync( eq(actorSelection(actorRef)), eqMergeData(nodeToWrite)); + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + MergeDataReply.SERIALIZABLE_CLASS); } @Test public void testDelete() throws Exception { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); + doReturn(deleteDataReply()).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqDeleteData()); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - WRITE_ONLY, schemaContext); + WRITE_ONLY); transactionProxy.delete(TestModel.TEST_PATH); - verify(mockActorContext).sendRemoteOperationAsync( + verify(mockActorContext).executeRemoteOperationAsync( eq(actorSelection(actorRef)), eqDeleteData()); + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + DeleteDataReply.SERIALIZABLE_CLASS); + } + + private void verifyCohortPathFutures(ThreePhaseCommitCohortProxy proxy, + Object... expReplies) throws Exception { + assertEquals("getReadyOperationFutures size", expReplies.length, + proxy.getCohortPathFutures().size()); + + int i = 0; + for( Future future: proxy.getCohortPathFutures()) { + assertNotNull("Ready operation Future is null", future); + + Object expReply = expReplies[i++]; + if(expReply instanceof ActorPath) { + ActorPath actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS)); + assertEquals("Cohort actor path", expReply, actual); + } else { + // Expecting exception. + try { + Await.result(future, Duration.create(5, TimeUnit.SECONDS)); + fail("Expected exception from ready operation Future"); + } catch(Exception e) { + // Expected + } + } + } } @SuppressWarnings("unchecked") @@ -426,31 +654,161 @@ public class TransactionProxyTest extends AbstractActorTest { public void testReady() throws Exception { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE); + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqReadData(), anyDuration()); + eq(actorSelection(actorRef)), eqReadData()); - doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperation( - eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration()); + doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); + + doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_WRITE, schemaContext); + READ_WRITE); transactionProxy.read(TestModel.TEST_PATH); + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + WriteDataReply.SERIALIZABLE_CLASS); + + verifyCohortPathFutures(proxy, actorRef.path()); + } + + @SuppressWarnings("unchecked") + @Test + public void testReadyWithRecordingOperationFailure() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqMergeData(nodeToWrite)); + + doReturn(Futures.failed(new TestException())).when(mockActorContext). + executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); + + doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); + + transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + MergeDataReply.SERIALIZABLE_CLASS, TestException.class); + + verifyCohortPathFutures(proxy, TestException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void testReadyWithReplyFailure() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqMergeData(nodeToWrite)); + + doReturn(Futures.failed(new TestException())).when(mockActorContext). + executeRemoteOperationAsync(eq(actorSelection(actorRef)), + isA(ReadyTransaction.SERIALIZABLE_CLASS)); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); + + transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + MergeDataReply.SERIALIZABLE_CLASS); + + verifyCohortPathFutures(proxy, TestException.class); + } + + @Test + public void testReadyWithInitialCreateTransactionFailure() throws Exception { + + doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation( + anyString(), any()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.delete(TestModel.TEST_PATH); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyCohortPathFutures(proxy, PrimaryNotFoundException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void testReadyWithInvalidReplyMessageType() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); + + doReturn(Futures.successful(new Object())).when(mockActorContext). + executeRemoteOperationAsync(eq(actorSelection(actorRef)), + isA(ReadyTransaction.SERIALIZABLE_CLASS)); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); assertTrue(ready instanceof ThreePhaseCommitCohortProxy); ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; - assertEquals("getCohortPaths", Arrays.asList(actorRef.path()), proxy.getCohortPaths()); + verifyCohortPathFutures(proxy, IllegalArgumentException.class); } @Test public void testGetIdentifier() { setupActorContextWithInitialCreateTransaction(READ_ONLY); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - TransactionProxy.TransactionType.READ_ONLY, schemaContext); + TransactionProxy.TransactionType.READ_ONLY); Object id = transactionProxy.getIdentifier(); assertNotNull("getIdentifier returned null", id); @@ -463,10 +821,10 @@ public class TransactionProxyTest extends AbstractActorTest { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE); doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqReadData(), anyDuration()); + eq(actorSelection(actorRef)), eqReadData()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_WRITE, schemaContext); + READ_WRITE); transactionProxy.read(TestModel.TEST_PATH);