X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxyTest.java;h=f69ae88ec873ed044ab40c36a63e6f0b68b9db67;hb=c675288c3ebd727fc72013e899183995c3fa9be7;hp=14696f786e7e36888be3b5517c14ae4b9779340f;hpb=9228eee6e438894b091f7bee8a4ba7b53286ef8f;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index 14696f786e..f69ae88ec8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -9,7 +9,9 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Props; import akka.dispatch.Futures; + import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; import org.junit.Before; import org.junit.Test; @@ -29,12 +31,15 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; import org.opendaylight.controller.cluster.datastore.messages.DeleteData; +import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply; import org.opendaylight.controller.cluster.datastore.messages.MergeData; +import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply; import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.WriteData; +import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply; import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; @@ -48,10 +53,12 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import scala.concurrent.Await; import scala.concurrent.Future; +import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; -import java.util.Arrays; +import java.util.List; import java.util.concurrent.TimeUnit; import static org.mockito.Matchers.any; @@ -62,6 +69,7 @@ import static org.mockito.Mockito.argThat; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.isA; +import static org.mockito.Mockito.times; @SuppressWarnings("resource") public class TransactionProxyTest extends AbstractActorTest { @@ -71,7 +79,7 @@ public class TransactionProxyTest extends AbstractActorTest { } static interface Invoker { - void invoke(TransactionProxy proxy) throws Exception; + CheckedFuture invoke(TransactionProxy proxy) throws Exception; } private final Configuration configuration = new MockConfiguration(); @@ -90,6 +98,8 @@ public class TransactionProxyTest extends AbstractActorTest { schemaContext = TestModel.createTestContext(); doReturn(getSystem()).when(mockActorContext).getActorSystem(); + doReturn(memberName).when(mockActorContext).getCurrentMemberName(); + doReturn(schemaContext).when(mockActorContext).getSchemaContext(); ShardStrategyFactory.setConfiguration(configuration); } @@ -112,8 +122,8 @@ public class TransactionProxyTest extends AbstractActorTest { ArgumentMatcher matcher = new ArgumentMatcher() { @Override public boolean matches(Object argument) { - DataExists obj = DataExists.fromSerializable(argument); - return obj.getPath().equals(TestModel.TEST_PATH); + return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) && + DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH); } }; @@ -124,8 +134,8 @@ public class TransactionProxyTest extends AbstractActorTest { ArgumentMatcher matcher = new ArgumentMatcher() { @Override public boolean matches(Object argument) { - ReadData obj = ReadData.fromSerializable(argument); - return obj.getPath().equals(TestModel.TEST_PATH); + return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) && + ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH); } }; @@ -136,6 +146,10 @@ public class TransactionProxyTest extends AbstractActorTest { ArgumentMatcher matcher = new ArgumentMatcher() { @Override public boolean matches(Object argument) { + if(!WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) { + return false; + } + WriteData obj = WriteData.fromSerializable(argument, schemaContext); return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite); @@ -149,6 +163,10 @@ public class TransactionProxyTest extends AbstractActorTest { ArgumentMatcher matcher = new ArgumentMatcher() { @Override public boolean matches(Object argument) { + if(!MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) { + return false; + } + MergeData obj = MergeData.fromSerializable(argument, schemaContext); return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite); @@ -162,27 +180,38 @@ public class TransactionProxyTest extends AbstractActorTest { ArgumentMatcher matcher = new ArgumentMatcher() { @Override public boolean matches(Object argument) { - DeleteData obj = DeleteData.fromSerializable(argument); - return obj.getPath().equals(TestModel.TEST_PATH); + return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) && + DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH); } }; return argThat(matcher); } - private Object readyTxReply(ActorPath path) { - return new ReadyTransactionReply(path).toSerializable(); + private Future readyTxReply(ActorPath path) { + return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable()); } private Future readDataReply(NormalizedNode data) { - return Futures.successful(new ReadDataReply(schemaContext, data) - .toSerializable()); + return Futures.successful(new ReadDataReply(schemaContext, data).toSerializable()); } private Future dataExistsReply(boolean exists) { return Futures.successful(new DataExistsReply(exists).toSerializable()); } + private Future writeDataReply() { + return Futures.successful(new WriteDataReply().toSerializable()); + } + + private Future mergeDataReply() { + return Futures.successful(new MergeDataReply().toSerializable()); + } + + private Future deleteDataReply() { + return Futures.successful(new DeleteDataReply().toSerializable()); + } + private ActorSelection actorSelection(ActorRef actorRef) { return getSystem().actorSelection(actorRef.path()); } @@ -201,7 +230,6 @@ public class TransactionProxyTest extends AbstractActorTest { ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class)); doReturn(getSystem().actorSelection(actorRef.path())). when(mockActorContext).actorSelection(actorRef.path().toString()); - doReturn(memberName).when(mockActorContext).getCurrentMemberName(); doReturn(createTransactionReply(actorRef)).when(mockActorContext). executeShardOperation(eq(DefaultShardStrategy.DEFAULT_SHARD), eqCreateTransaction(memberName, type), anyDuration()); @@ -212,12 +240,23 @@ public class TransactionProxyTest extends AbstractActorTest { return actorRef; } + private void propagateReadFailedExceptionCause(CheckedFuture future) + throws Throwable { + + try { + future.checkedGet(5, TimeUnit.SECONDS); + fail("Expected ReadFailedException"); + } catch(ReadFailedException e) { + throw e.getCause(); + } + } + @Test public void testRead() throws Exception { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_ONLY, schemaContext); + READ_ONLY); doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync( eq(actorSelection(actorRef)), eqReadData(), anyDuration()); @@ -240,14 +279,14 @@ public class TransactionProxyTest extends AbstractActorTest { } @Test(expected = ReadFailedException.class) - public void testReadWhenAnInvalidMessageIsSentInReply() throws Exception { + public void testReadWithInvalidReplyMessageType() throws Exception { setupActorContextWithInitialCreateTransaction(READ_ONLY); doReturn(Futures.successful(new Object())).when(mockActorContext). executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_ONLY, schemaContext); + READ_ONLY); transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); } @@ -256,19 +295,13 @@ public class TransactionProxyTest extends AbstractActorTest { public void testReadWithAsyncRemoteOperatonFailure() throws Throwable { setupActorContextWithInitialCreateTransaction(READ_ONLY); - doThrow(new TestException()).when(mockActorContext). + doReturn(Futures.failed(new TestException())).when(mockActorContext). executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_ONLY, schemaContext); + READ_ONLY); - try { - transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); - fail("Expected ReadFailedException"); - } catch(ReadFailedException e) { - // Expected - throw cause - expects TestException. - throw e.getCause(); - } + propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH)); } private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker) @@ -278,22 +311,16 @@ public class TransactionProxyTest extends AbstractActorTest { anyString(), any(), anyDuration()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_ONLY, schemaContext); + READ_ONLY); - try { - invoker.invoke(transactionProxy); - fail("Expected ReadFailedException"); - } catch(ReadFailedException e) { - // Expected - throw cause - expects TestException. - throw e.getCause(); - } + propagateReadFailedExceptionCause(invoker.invoke(transactionProxy)); } private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable { testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() { @Override - public void invoke(TransactionProxy proxy) throws Exception { - proxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); + public CheckedFuture invoke(TransactionProxy proxy) throws Exception { + return proxy.read(TestModel.TEST_PATH); } }); } @@ -314,12 +341,77 @@ public class TransactionProxyTest extends AbstractActorTest { testReadWithExceptionOnInitialCreateTransaction(new TestException()); } + @Test(expected = TestException.class) + public void testReadWithPriorRecordingOperationFailure() throws Throwable { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration()); + + doReturn(Futures.failed(new TestException())).when(mockActorContext). + executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData(), + anyDuration()); + + doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqReadData(), anyDuration()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_WRITE); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.delete(TestModel.TEST_PATH); + + try { + propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH)); + } finally { + verify(mockActorContext, times(0)).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqReadData(), anyDuration()); + } + } + + @Test + public void testReadWithPriorRecordingOperationSuccessful() throws Throwable { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE); + + NormalizedNode expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqWriteData(expectedNode), anyDuration()); + + doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqReadData(), anyDuration()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_WRITE); + + transactionProxy.write(TestModel.TEST_PATH, expectedNode); + + Optional> readOptional = transactionProxy.read( + TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); + + assertEquals("NormalizedNode isPresent", true, readOptional.isPresent()); + + assertEquals("Response NormalizedNode", expectedNode, readOptional.get()); + } + + @Test(expected=IllegalStateException.class) + public void testReadPreConditionCheck() { + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); + + transactionProxy.read(TestModel.TEST_PATH); + } + @Test public void testExists() throws Exception { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_ONLY, schemaContext); + READ_ONLY); doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync( eq(actorSelection(actorRef)), eqDataExists(), anyDuration()); @@ -340,21 +432,21 @@ public class TransactionProxyTest extends AbstractActorTest { public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable { testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() { @Override - public void invoke(TransactionProxy proxy) throws Exception { - proxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); + public CheckedFuture invoke(TransactionProxy proxy) throws Exception { + return proxy.exists(TestModel.TEST_PATH); } }); } @Test(expected = ReadFailedException.class) - public void testExistsWhenAnInvalidMessageIsSentInReply() throws Exception { + public void testExistsWithInvalidReplyMessageType() throws Exception { setupActorContextWithInitialCreateTransaction(READ_ONLY); doReturn(Futures.successful(new Object())).when(mockActorContext). executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_ONLY, schemaContext); + READ_ONLY); transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); } @@ -363,62 +455,206 @@ public class TransactionProxyTest extends AbstractActorTest { public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable { setupActorContextWithInitialCreateTransaction(READ_ONLY); - doThrow(new TestException()).when(mockActorContext). + doReturn(Futures.failed(new TestException())).when(mockActorContext). executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_ONLY, schemaContext); + READ_ONLY); + + propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH)); + } + + @Test(expected = TestException.class) + public void testExistsWithPriorRecordingOperationFailure() throws Throwable { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration()); + + doReturn(Futures.failed(new TestException())).when(mockActorContext). + executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData(), + anyDuration()); + + doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqDataExists(), anyDuration()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_WRITE); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.delete(TestModel.TEST_PATH); try { - transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); - fail("Expected ReadFailedException"); - } catch(ReadFailedException e) { - // Expected - throw cause - expects TestException. - throw e.getCause(); + propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH)); + } finally { + verify(mockActorContext, times(0)).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqDataExists(), anyDuration()); } } @Test - public void testWrite() throws Exception { - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); + public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration()); + + doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqDataExists(), anyDuration()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_WRITE); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(); + + assertEquals("Exists response", true, exists); + } + + @Test(expected=IllegalStateException.class) + public void testxistsPreConditionCheck() { TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - WRITE_ONLY, schemaContext); + WRITE_ONLY); + + transactionProxy.exists(TestModel.TEST_PATH); + } + + private void verifyRecordingOperationFutures(List> futures, + Class... expResultTypes) throws Exception { + assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size()); + + int i = 0; + for( Future future: futures) { + assertNotNull("Recording operation Future is null", future); + + Class expResultType = expResultTypes[i++]; + if(Throwable.class.isAssignableFrom(expResultType)) { + try { + Await.result(future, Duration.create(5, TimeUnit.SECONDS)); + fail("Expected exception from recording operation Future"); + } catch(Exception e) { + // Expected + } + } else { + assertEquals("Recording operation Future result type", expResultType, + Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass()); + } + } + } + + @Test + public void testWrite() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); - verify(mockActorContext).sendRemoteOperationAsync( - eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); + verify(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration()); + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + WriteDataReply.SERIALIZABLE_CLASS); + } + + @Test(expected=IllegalStateException.class) + public void testWritePreConditionCheck() { + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY); + + transactionProxy.write(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + } + + @Test(expected=IllegalStateException.class) + public void testWriteAfterReadyPreConditionCheck() { + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); + + transactionProxy.ready(); + + transactionProxy.write(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); } @Test public void testMerge() throws Exception { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - WRITE_ONLY, schemaContext); - NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); + transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); - verify(mockActorContext).sendRemoteOperationAsync( - eq(actorSelection(actorRef)), eqMergeData(nodeToWrite)); + verify(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration()); + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + MergeDataReply.SERIALIZABLE_CLASS); } @Test public void testDelete() throws Exception { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); + doReturn(deleteDataReply()).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqDeleteData(), anyDuration()); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - WRITE_ONLY, schemaContext); + WRITE_ONLY); transactionProxy.delete(TestModel.TEST_PATH); - verify(mockActorContext).sendRemoteOperationAsync( - eq(actorSelection(actorRef)), eqDeleteData()); + verify(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqDeleteData(), anyDuration()); + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + DeleteDataReply.SERIALIZABLE_CLASS); + } + + private void verifyCohortPathFutures(ThreePhaseCommitCohortProxy proxy, + Object... expReplies) throws Exception { + assertEquals("getReadyOperationFutures size", expReplies.length, + proxy.getCohortPathFutures().size()); + + int i = 0; + for( Future future: proxy.getCohortPathFutures()) { + assertNotNull("Ready operation Future is null", future); + + Object expReply = expReplies[i++]; + if(expReply instanceof ActorPath) { + ActorPath actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS)); + assertEquals("Cohort actor path", expReply, actual); + } else { + // Expecting exception. + try { + Await.result(future, Duration.create(5, TimeUnit.SECONDS)); + fail("Expected exception from ready operation Future"); + } catch(Exception e) { + // Expected + } + } + } } @SuppressWarnings("unchecked") @@ -426,31 +662,162 @@ public class TransactionProxyTest extends AbstractActorTest { public void testReady() throws Exception { ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE); + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync( eq(actorSelection(actorRef)), eqReadData(), anyDuration()); - doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperation( + doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration()); + + doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync( eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_WRITE, schemaContext); + READ_WRITE); transactionProxy.read(TestModel.TEST_PATH); + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + WriteDataReply.SERIALIZABLE_CLASS); + + verifyCohortPathFutures(proxy, actorRef.path()); + } + + @SuppressWarnings("unchecked") + @Test + public void testReadyWithRecordingOperationFailure() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration()); + + doReturn(Futures.failed(new TestException())).when(mockActorContext). + executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), + anyDuration()); + + doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); + + transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + MergeDataReply.SERIALIZABLE_CLASS, TestException.class); + + verifyCohortPathFutures(proxy, TestException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void testReadyWithReplyFailure() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration()); + + doReturn(Futures.failed(new TestException())).when(mockActorContext). + executeRemoteOperationAsync(eq(actorSelection(actorRef)), + isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); + + transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + MergeDataReply.SERIALIZABLE_CLASS); + + verifyCohortPathFutures(proxy, TestException.class); + } + + @Test + public void testReadyWithInitialCreateTransactionFailure() throws Exception { + + doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation( + anyString(), any(), anyDuration()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.delete(TestModel.TEST_PATH); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyCohortPathFutures(proxy, PrimaryNotFoundException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void testReadyWithInvalidReplyMessageType() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration()); + + doReturn(Futures.successful(new Object())).when(mockActorContext). + executeRemoteOperationAsync(eq(actorSelection(actorRef)), + isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); assertTrue(ready instanceof ThreePhaseCommitCohortProxy); ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; - assertEquals("getCohortPaths", Arrays.asList(actorRef.path()), proxy.getCohortPaths()); + verifyCohortPathFutures(proxy, IllegalArgumentException.class); } @Test public void testGetIdentifier() { setupActorContextWithInitialCreateTransaction(READ_ONLY); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - TransactionProxy.TransactionType.READ_ONLY, schemaContext); + TransactionProxy.TransactionType.READ_ONLY); Object id = transactionProxy.getIdentifier(); assertNotNull("getIdentifier returned null", id); @@ -466,7 +833,7 @@ public class TransactionProxyTest extends AbstractActorTest { eq(actorSelection(actorRef)), eqReadData(), anyDuration()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_WRITE, schemaContext); + READ_WRITE); transactionProxy.read(TestModel.TEST_PATH);