X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxyTest.java;h=e5392e025158704f44d152306aaa727b64d460e8;hp=fa32c87e8f60ea18657c19efe2ee9f8d9d7019e5;hb=2dc333588d0c15eb7f2df6223dcdcc15e05b077e;hpb=11031657dfdf7632e63772223e3c07f557a4f491 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index fa32c87e8f..e5392e0251 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -1,267 +1,836 @@ package org.opendaylight.controller.cluster.datastore; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; +import akka.actor.ActorPath; import akka.actor.ActorRef; +import akka.actor.ActorSelection; import akka.actor.Props; +import akka.dispatch.Futures; + import com.google.common.base.Optional; -import com.google.common.util.concurrent.ListenableFuture; -import junit.framework.Assert; +import com.google.common.util.concurrent.CheckedFuture; + +import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentMatcher; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY; +import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY; +import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE; + +import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType; +import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; +import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; +import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; +import org.opendaylight.controller.cluster.datastore.messages.DataExists; +import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; import org.opendaylight.controller.cluster.datastore.messages.DeleteData; +import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply; import org.opendaylight.controller.cluster.datastore.messages.MergeData; +import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply; +import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.WriteData; +import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply; +import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; +import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; -import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; -import org.opendaylight.controller.cluster.datastore.utils.MockActorContext; +import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - +import java.util.concurrent.TimeUnit; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.argThat; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.isA; +import static org.mockito.Mockito.times; + +@SuppressWarnings("resource") public class TransactionProxyTest extends AbstractActorTest { - private ExecutorService transactionExecutor = - Executors.newSingleThreadExecutor(); + @SuppressWarnings("serial") + static class TestException extends RuntimeException { + } + + static interface Invoker { + CheckedFuture invoke(TransactionProxy proxy) throws Exception; + } + + private final Configuration configuration = new MockConfiguration(); + + @Mock + private ActorContext mockActorContext; + + private SchemaContext schemaContext; + + String memberName = "mock-member"; + + @Before + public void setUp(){ + MockitoAnnotations.initMocks(this); + + schemaContext = TestModel.createTestContext(); + + doReturn(getSystem()).when(mockActorContext).getActorSystem(); + doReturn(memberName).when(mockActorContext).getCurrentMemberName(); + doReturn(schemaContext).when(mockActorContext).getSchemaContext(); + + ShardStrategyFactory.setConfiguration(configuration); + } + + private CreateTransaction eqCreateTransaction(final String memberName, + final TransactionType type) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + CreateTransaction obj = CreateTransaction.fromSerializable(argument); + return obj.getTransactionId().startsWith(memberName) && + obj.getTransactionType() == type.ordinal(); + } + }; + + return argThat(matcher); + } + + private DataExists eqDataExists() { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) && + DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH); + } + }; + + return argThat(matcher); + } + + private ReadData eqReadData() { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) && + ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH); + } + }; + + return argThat(matcher); + } + + private WriteData eqWriteData(final NormalizedNode nodeToWrite) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + if(!WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) { + return false; + } + + WriteData obj = WriteData.fromSerializable(argument, schemaContext); + return obj.getPath().equals(TestModel.TEST_PATH) && + obj.getData().equals(nodeToWrite); + } + }; + + return argThat(matcher); + } + + private MergeData eqMergeData(final NormalizedNode nodeToWrite) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + if(!MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) { + return false; + } + + MergeData obj = MergeData.fromSerializable(argument, schemaContext); + return obj.getPath().equals(TestModel.TEST_PATH) && + obj.getData().equals(nodeToWrite); + } + }; + + return argThat(matcher); + } + + private DeleteData eqDeleteData() { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) && + DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH); + } + }; + + return argThat(matcher); + } + + private Future readyTxReply(ActorPath path) { + return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable()); + } + + private Future readDataReply(NormalizedNode data) { + return Futures.successful(new ReadDataReply(schemaContext, data).toSerializable()); + } + + private Future dataExistsReply(boolean exists) { + return Futures.successful(new DataExistsReply(exists).toSerializable()); + } + + private Future writeDataReply() { + return Futures.successful(new WriteDataReply().toSerializable()); + } + + private Future mergeDataReply() { + return Futures.successful(new MergeDataReply().toSerializable()); + } + + private Future deleteDataReply() { + return Futures.successful(new DeleteDataReply().toSerializable()); + } + + private ActorSelection actorSelection(ActorRef actorRef) { + return getSystem().actorSelection(actorRef.path()); + } + + private CreateTransactionReply createTransactionReply(ActorRef actorRef){ + return CreateTransactionReply.newBuilder() + .setTransactionActorPath(actorRef.path().toString()) + .setTransactionId("txn-1").build(); + } + + private ActorRef setupActorContextWithInitialCreateTransaction(TransactionType type) { + ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class)); + doReturn(getSystem().actorSelection(actorRef.path())). + when(mockActorContext).actorSelection(actorRef.path().toString()); + doReturn(createTransactionReply(actorRef)).when(mockActorContext). + executeShardOperation(eq(DefaultShardStrategy.DEFAULT_SHARD), + eqCreateTransaction(memberName, type)); + doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath( + anyString(), eq(actorRef.path().toString())); + doReturn(actorRef.path()).when(mockActorContext).actorFor(actorRef.path().toString()); + + return actorRef; + } + + private void propagateReadFailedExceptionCause(CheckedFuture future) + throws Throwable { + + try { + future.checkedGet(5, TimeUnit.SECONDS); + fail("Expected ReadFailedException"); + } catch(ReadFailedException e) { + throw e.getCause(); + } + } @Test public void testRead() throws Exception { - final Props props = Props.create(DoNothingActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY); + + doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqReadData()); + + Optional> readOptional = transactionProxy.read( + TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); + + assertEquals("NormalizedNode isPresent", false, readOptional.isPresent()); - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); - actorContext.setExecuteRemoteOperationResponse("message"); + NormalizedNode expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor); + doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqReadData()); + readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); - ListenableFuture>> read = - transactionProxy.read(TestModel.TEST_PATH); + assertEquals("NormalizedNode isPresent", true, readOptional.isPresent()); - Optional> normalizedNodeOptional = read.get(); + assertEquals("Response NormalizedNode", expectedNode, readOptional.get()); + } + + @Test(expected = ReadFailedException.class) + public void testReadWithInvalidReplyMessageType() throws Exception { + setupActorContextWithInitialCreateTransaction(READ_ONLY); - Assert.assertFalse(normalizedNodeOptional.isPresent()); + doReturn(Futures.successful(new Object())).when(mockActorContext). + executeRemoteOperationAsync(any(ActorSelection.class), any()); - actorContext.setExecuteRemoteOperationResponse(new ReadDataReply( - ImmutableNodes.containerNode(TestModel.TEST_QNAME))); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY); + + transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); + } - read = transactionProxy.read(TestModel.TEST_PATH); + @Test(expected = TestException.class) + public void testReadWithAsyncRemoteOperatonFailure() throws Throwable { + setupActorContextWithInitialCreateTransaction(READ_ONLY); - normalizedNodeOptional = read.get(); + doReturn(Futures.failed(new TestException())).when(mockActorContext). + executeRemoteOperationAsync(any(ActorSelection.class), any()); - Assert.assertTrue(normalizedNodeOptional.isPresent()); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY); + + propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH)); + } + + private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker) + throws Throwable { + + doThrow(exToThrow).when(mockActorContext).executeShardOperation( + anyString(), any()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY); + + propagateReadFailedExceptionCause(invoker.invoke(transactionProxy)); + } + + private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable { + testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() { + @Override + public CheckedFuture invoke(TransactionProxy proxy) throws Exception { + return proxy.read(TestModel.TEST_PATH); + } + }); + } + + @Test(expected = PrimaryNotFoundException.class) + public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable { + testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test")); + } + + @Test(expected = TimeoutException.class) + public void testReadWhenATimeoutExceptionIsThrown() throws Throwable { + testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test", + new Exception("reason"))); + } + + @Test(expected = TestException.class) + public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable { + testReadWithExceptionOnInitialCreateTransaction(new TestException()); + } + + @Test(expected = TestException.class) + public void testReadWithPriorRecordingOperationFailure() throws Throwable { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); + + doReturn(Futures.failed(new TestException())).when(mockActorContext). + executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData()); + + doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqReadData()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_WRITE); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.delete(TestModel.TEST_PATH); + + try { + propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH)); + } finally { + verify(mockActorContext, times(0)).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqReadData()); + } } @Test - public void testReadWhenANullIsReturned() throws Exception { - final Props props = Props.create(DoNothingActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + public void testReadWithPriorRecordingOperationSuccessful() throws Throwable { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE); + + NormalizedNode expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); - actorContext.setExecuteRemoteOperationResponse("message"); + doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqWriteData(expectedNode)); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor); + doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqReadData()); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_WRITE); - ListenableFuture>> read = - transactionProxy.read(TestModel.TEST_PATH); + transactionProxy.write(TestModel.TEST_PATH, expectedNode); - Optional> normalizedNodeOptional = read.get(); + Optional> readOptional = transactionProxy.read( + TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); - Assert.assertFalse(normalizedNodeOptional.isPresent()); + assertEquals("NormalizedNode isPresent", true, readOptional.isPresent()); - actorContext.setExecuteRemoteOperationResponse(new ReadDataReply( - null)); + assertEquals("Response NormalizedNode", expectedNode, readOptional.get()); + } - read = transactionProxy.read(TestModel.TEST_PATH); + @Test(expected=IllegalStateException.class) + public void testReadPreConditionCheck() { - normalizedNodeOptional = read.get(); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); - Assert.assertFalse(normalizedNodeOptional.isPresent()); + transactionProxy.read(TestModel.TEST_PATH); } @Test - public void testWrite() throws Exception { - final Props props = Props.create(MessageCollectorActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + public void testExists() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY); - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); - actorContext.setExecuteRemoteOperationResponse("message"); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor); + doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqDataExists()); - transactionProxy.write(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.NAME_QNAME)); + Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(); + + assertEquals("Exists response", false, exists); + + doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqDataExists()); + + exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(); + + assertEquals("Exists response", true, exists); + } + + @Test(expected = PrimaryNotFoundException.class) + public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable { + testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() { + @Override + public CheckedFuture invoke(TransactionProxy proxy) throws Exception { + return proxy.exists(TestModel.TEST_PATH); + } + }); + } + + @Test(expected = ReadFailedException.class) + public void testExistsWithInvalidReplyMessageType() throws Exception { + setupActorContextWithInitialCreateTransaction(READ_ONLY); + + doReturn(Futures.successful(new Object())).when(mockActorContext). + executeRemoteOperationAsync(any(ActorSelection.class), any()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY); + + transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); + } + + @Test(expected = TestException.class) + public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable { + setupActorContextWithInitialCreateTransaction(READ_ONLY); - ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class))); - Object messages = testContext - .executeLocalOperation(actorRef, "messages", - ActorContext.ASK_DURATION); + doReturn(Futures.failed(new TestException())).when(mockActorContext). + executeRemoteOperationAsync(any(ActorSelection.class), any()); - Assert.assertNotNull(messages); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY); - Assert.assertTrue(messages instanceof List); + propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH)); + } + + @Test(expected = TestException.class) + public void testExistsWithPriorRecordingOperationFailure() throws Throwable { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); + + doReturn(Futures.failed(new TestException())).when(mockActorContext). + executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqDeleteData()); + + doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqDataExists()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_WRITE); - List listMessages = (List) messages; + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); - Assert.assertEquals(1, listMessages.size()); + transactionProxy.delete(TestModel.TEST_PATH); - Assert.assertTrue(listMessages.get(0) instanceof WriteData); + try { + propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH)); + } finally { + verify(mockActorContext, times(0)).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqDataExists()); + } } @Test - public void testMerge() throws Exception { - final Props props = Props.create(MessageCollectorActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE); - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); - actorContext.setExecuteRemoteOperationResponse("message"); + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor); + doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); - transactionProxy.merge(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.NAME_QNAME)); + doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqDataExists()); - ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class))); - Object messages = testContext - .executeLocalOperation(actorRef, "messages", - ActorContext.ASK_DURATION); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_WRITE); - Assert.assertNotNull(messages); + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); - Assert.assertTrue(messages instanceof List); + Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(); + + assertEquals("Exists response", true, exists); + } - List listMessages = (List) messages; + @Test(expected=IllegalStateException.class) + public void testxistsPreConditionCheck() { - Assert.assertEquals(1, listMessages.size()); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); - Assert.assertTrue(listMessages.get(0) instanceof MergeData); + transactionProxy.exists(TestModel.TEST_PATH); + } + + private void verifyRecordingOperationFutures(List> futures, + Class... expResultTypes) throws Exception { + assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size()); + + int i = 0; + for( Future future: futures) { + assertNotNull("Recording operation Future is null", future); + + Class expResultType = expResultTypes[i++]; + if(Throwable.class.isAssignableFrom(expResultType)) { + try { + Await.result(future, Duration.create(5, TimeUnit.SECONDS)); + fail("Expected exception from recording operation Future"); + } catch(Exception e) { + // Expected + } + } else { + assertEquals("Recording operation Future result type", expResultType, + Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass()); + } + } + } + + @Test + public void testWrite() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + verify(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + WriteDataReply.SERIALIZABLE_CLASS); + } + + @Test(expected=IllegalStateException.class) + public void testWritePreConditionCheck() { + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY); + + transactionProxy.write(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + } + + @Test(expected=IllegalStateException.class) + public void testWriteAfterReadyPreConditionCheck() { + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); + + transactionProxy.ready(); + + transactionProxy.write(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + } + + @Test + public void testMerge() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqMergeData(nodeToWrite)); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); + + transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); + + verify(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqMergeData(nodeToWrite)); + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + MergeDataReply.SERIALIZABLE_CLASS); } @Test public void testDelete() throws Exception { - final Props props = Props.create(MessageCollectorActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); - actorContext.setExecuteRemoteOperationResponse("message"); + doReturn(deleteDataReply()).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqDeleteData()); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); transactionProxy.delete(TestModel.TEST_PATH); - ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class))); - Object messages = testContext - .executeLocalOperation(actorRef, "messages", - ActorContext.ASK_DURATION); + verify(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqDeleteData()); + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + DeleteDataReply.SERIALIZABLE_CLASS); + } + + private void verifyCohortPathFutures(ThreePhaseCommitCohortProxy proxy, + Object... expReplies) throws Exception { + assertEquals("getReadyOperationFutures size", expReplies.length, + proxy.getCohortPathFutures().size()); + + int i = 0; + for( Future future: proxy.getCohortPathFutures()) { + assertNotNull("Ready operation Future is null", future); + + Object expReply = expReplies[i++]; + if(expReply instanceof ActorPath) { + ActorPath actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS)); + assertEquals("Cohort actor path", expReply, actual); + } else { + // Expecting exception. + try { + Await.result(future, Duration.create(5, TimeUnit.SECONDS)); + fail("Expected exception from ready operation Future"); + } catch(Exception e) { + // Expected + } + } + } + } + + @SuppressWarnings("unchecked") + @Test + public void testReady() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqReadData()); - Assert.assertNotNull(messages); + doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); - Assert.assertTrue(messages instanceof List); + doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); - List listMessages = (List) messages; + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_WRITE); - Assert.assertEquals(1, listMessages.size()); + transactionProxy.read(TestModel.TEST_PATH); - Assert.assertEquals(DeleteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass()); + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + WriteDataReply.SERIALIZABLE_CLASS); + + verifyCohortPathFutures(proxy, actorRef.path()); } + @SuppressWarnings("unchecked") @Test - public void testReady() throws Exception { - final Props props = Props.create(DoNothingActor.class); - final ActorRef doNothingActorRef = getSystem().actorOf(props); + public void testReadyWithRecordingOperationFailure() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqMergeData(nodeToWrite)); + + doReturn(Futures.failed(new TestException())).when(mockActorContext). + executeRemoteOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse(createTransactionReply(doNothingActorRef)); - actorContext.setExecuteRemoteOperationResponse(new ReadyTransactionReply(doNothingActorRef.path())); + doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); + transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - Assert.assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; - Assert.assertTrue("No cohort paths returned", proxy.getCohortPaths().size() > 0); + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + MergeDataReply.SERIALIZABLE_CLASS, TestException.class); + verifyCohortPathFutures(proxy, TestException.class); } + @SuppressWarnings("unchecked") @Test - public void testGetIdentifier(){ - final Props props = Props.create(DoNothingActor.class); - final ActorRef doNothingActorRef = getSystem().actorOf(props); + public void testReadyWithReplyFailure() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(mergeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqMergeData(nodeToWrite)); - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse( createTransactionReply(doNothingActorRef) ); + doReturn(Futures.failed(new TestException())).when(mockActorContext). + executeRemoteOperationAsync(eq(actorSelection(actorRef)), + isA(ReadyTransaction.SERIALIZABLE_CLASS)); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); - Assert.assertNotNull(transactionProxy.getIdentifier()); + transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + MergeDataReply.SERIALIZABLE_CLASS); + + verifyCohortPathFutures(proxy, TestException.class); } @Test - public void testClose(){ - final Props props = Props.create(MessageCollectorActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + public void testReadyWithInitialCreateTransactionFailure() throws Exception { - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); - actorContext.setExecuteRemoteOperationResponse("message"); + doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation( + anyString(), any()); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); - transactionProxy.close(); + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); - ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class))); - Object messages = testContext - .executeLocalOperation(actorRef, "messages", - ActorContext.ASK_DURATION); + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); - Assert.assertNotNull(messages); + transactionProxy.delete(TestModel.TEST_PATH); - Assert.assertTrue(messages instanceof List); + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - List listMessages = (List) messages; + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); - Assert.assertEquals(1, listMessages.size()); + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; - Assert.assertTrue(listMessages.get(0) instanceof CloseTransaction); + verifyCohortPathFutures(proxy, PrimaryNotFoundException.class); } - private CreateTransactionReply createTransactionReply(ActorRef actorRef){ - return CreateTransactionReply.newBuilder() - .setTransactionActorPath(actorRef.path().toString()) - .setTransactionId("txn-1") - .build(); + @SuppressWarnings("unchecked") + @Test + public void testReadyWithInvalidReplyMessageType() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(writeDataReply()).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); + + doReturn(Futures.successful(new Object())).when(mockActorContext). + executeRemoteOperationAsync(eq(actorSelection(actorRef)), + isA(ReadyTransaction.SERIALIZABLE_CLASS)); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyCohortPathFutures(proxy, IllegalArgumentException.class); + } + + @Test + public void testGetIdentifier() { + setupActorContextWithInitialCreateTransaction(READ_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + TransactionProxy.TransactionType.READ_ONLY); + + Object id = transactionProxy.getIdentifier(); + assertNotNull("getIdentifier returned null", id); + assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName)); + } + + @SuppressWarnings("unchecked") + @Test + public void testClose() throws Exception{ + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE); + + doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqReadData()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_WRITE); + + transactionProxy.read(TestModel.TEST_PATH); + + transactionProxy.close(); + + verify(mockActorContext).sendRemoteOperationAsync( + eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS)); } }