X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxyTest.java;h=14696f786e7e36888be3b5517c14ae4b9779340f;hp=f654e3aced738ef0c85a6400babe901d9c98d3ea;hb=30faeb35260541c273a81b8f126b40da94daa825;hpb=ed693440aa741fee9b94447f8404d89b4020f616 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index f654e3aced..14696f786e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -1,294 +1,478 @@ package org.opendaylight.controller.cluster.datastore; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; +import akka.actor.ActorPath; import akka.actor.ActorRef; +import akka.actor.ActorSelection; import akka.actor.Props; +import akka.dispatch.Futures; import com.google.common.base.Optional; -import com.google.common.util.concurrent.ListenableFuture; -import junit.framework.Assert; + import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentMatcher; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY; +import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY; +import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE; + +import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType; +import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; +import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; +import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; +import org.opendaylight.controller.cluster.datastore.messages.DataExists; +import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; import org.opendaylight.controller.cluster.datastore.messages.DeleteData; import org.opendaylight.controller.cluster.datastore.messages.MergeData; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; +import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.WriteData; +import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; -import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; -import org.opendaylight.controller.cluster.datastore.utils.MockActorContext; -import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.argThat; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.isA; + +@SuppressWarnings("resource") public class TransactionProxyTest extends AbstractActorTest { + @SuppressWarnings("serial") + static class TestException extends RuntimeException { + } + + static interface Invoker { + void invoke(TransactionProxy proxy) throws Exception; + } + private final Configuration configuration = new MockConfiguration(); - private final ActorContext testContext = - new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)), new MockClusterWrapper(), configuration ); + @Mock + private ActorContext mockActorContext; - private ExecutorService transactionExecutor = - Executors.newSingleThreadExecutor(); + private SchemaContext schemaContext; + + String memberName = "mock-member"; @Before public void setUp(){ + MockitoAnnotations.initMocks(this); + + schemaContext = TestModel.createTestContext(); + + doReturn(getSystem()).when(mockActorContext).getActorSystem(); + ShardStrategyFactory.setConfiguration(configuration); } - @Test - public void testRead() throws Exception { - final Props props = Props.create(DoNothingActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + private CreateTransaction eqCreateTransaction(final String memberName, + final TransactionType type) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + CreateTransaction obj = CreateTransaction.fromSerializable(argument); + return obj.getTransactionId().startsWith(memberName) && + obj.getTransactionType() == type.ordinal(); + } + }; + + return argThat(matcher); + } + + private DataExists eqDataExists() { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + DataExists obj = DataExists.fromSerializable(argument); + return obj.getPath().equals(TestModel.TEST_PATH); + } + }; + + return argThat(matcher); + } + + private ReadData eqReadData() { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + ReadData obj = ReadData.fromSerializable(argument); + return obj.getPath().equals(TestModel.TEST_PATH); + } + }; + + return argThat(matcher); + } - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef)); - actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); - actorContext.setExecuteRemoteOperationResponse("message"); + private WriteData eqWriteData(final NormalizedNode nodeToWrite) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + WriteData obj = WriteData.fromSerializable(argument, schemaContext); + return obj.getPath().equals(TestModel.TEST_PATH) && + obj.getData().equals(nodeToWrite); + } + }; + + return argThat(matcher); + } + private MergeData eqMergeData(final NormalizedNode nodeToWrite) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + MergeData obj = MergeData.fromSerializable(argument, schemaContext); + return obj.getPath().equals(TestModel.TEST_PATH) && + obj.getData().equals(nodeToWrite); + } + }; + + return argThat(matcher); + } - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + private DeleteData eqDeleteData() { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + DeleteData obj = DeleteData.fromSerializable(argument); + return obj.getPath().equals(TestModel.TEST_PATH); + } + }; + return argThat(matcher); + } - ListenableFuture>> read = - transactionProxy.read(TestModel.TEST_PATH); + private Object readyTxReply(ActorPath path) { + return new ReadyTransactionReply(path).toSerializable(); + } - Optional> normalizedNodeOptional = read.get(); + private Future readDataReply(NormalizedNode data) { + return Futures.successful(new ReadDataReply(schemaContext, data) + .toSerializable()); + } - Assert.assertFalse(normalizedNodeOptional.isPresent()); + private Future dataExistsReply(boolean exists) { + return Futures.successful(new DataExistsReply(exists).toSerializable()); + } - actorContext.setExecuteRemoteOperationResponse(new ReadDataReply( - TestModel.createTestContext(),ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable()); + private ActorSelection actorSelection(ActorRef actorRef) { + return getSystem().actorSelection(actorRef.path()); + } - read = transactionProxy.read(TestModel.TEST_PATH); + private FiniteDuration anyDuration() { + return any(FiniteDuration.class); + } - normalizedNodeOptional = read.get(); + private CreateTransactionReply createTransactionReply(ActorRef actorRef){ + return CreateTransactionReply.newBuilder() + .setTransactionActorPath(actorRef.path().toString()) + .setTransactionId("txn-1").build(); + } - Assert.assertTrue(normalizedNodeOptional.isPresent()); + private ActorRef setupActorContextWithInitialCreateTransaction(TransactionType type) { + ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class)); + doReturn(getSystem().actorSelection(actorRef.path())). + when(mockActorContext).actorSelection(actorRef.path().toString()); + doReturn(memberName).when(mockActorContext).getCurrentMemberName(); + doReturn(createTransactionReply(actorRef)).when(mockActorContext). + executeShardOperation(eq(DefaultShardStrategy.DEFAULT_SHARD), + eqCreateTransaction(memberName, type), anyDuration()); + doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath( + anyString(), eq(actorRef.path().toString())); + doReturn(actorRef.path()).when(mockActorContext).actorFor(actorRef.path().toString()); + + return actorRef; } @Test - public void testReadWhenANullIsReturned() throws Exception { - final Props props = Props.create(DoNothingActor.class); - final ActorRef actorRef = getSystem().actorOf(props); - - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef)); - actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); - actorContext.setExecuteRemoteOperationResponse("message"); + public void testRead() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY, schemaContext); + doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqReadData(), anyDuration()); - ListenableFuture>> read = - transactionProxy.read(TestModel.TEST_PATH); + Optional> readOptional = transactionProxy.read( + TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); - Optional> normalizedNodeOptional = read.get(); + assertEquals("NormalizedNode isPresent", false, readOptional.isPresent()); - Assert.assertFalse(normalizedNodeOptional.isPresent()); + NormalizedNode expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - actorContext.setExecuteRemoteOperationResponse(new ReadDataReply( - TestModel.createTestContext(), null).toSerializable()); + doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqReadData(), anyDuration()); - read = transactionProxy.read(TestModel.TEST_PATH); + readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); - normalizedNodeOptional = read.get(); + assertEquals("NormalizedNode isPresent", true, readOptional.isPresent()); - Assert.assertFalse(normalizedNodeOptional.isPresent()); + assertEquals("Response NormalizedNode", expectedNode, readOptional.get()); } - @Test - public void testWrite() throws Exception { - final Props props = Props.create(MessageCollectorActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + @Test(expected = ReadFailedException.class) + public void testReadWhenAnInvalidMessageIsSentInReply() throws Exception { + setupActorContextWithInitialCreateTransaction(READ_ONLY); + + doReturn(Futures.successful(new Object())).when(mockActorContext). + executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration()); - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef)); - actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); - actorContext.setExecuteRemoteOperationResponse("message"); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY, schemaContext); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); + } - transactionProxy.write(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.NAME_QNAME)); + @Test(expected = TestException.class) + public void testReadWithAsyncRemoteOperatonFailure() throws Throwable { + setupActorContextWithInitialCreateTransaction(READ_ONLY); - Object messages = testContext - .executeLocalOperation(actorRef, "messages", - ActorContext.ASK_DURATION); + doThrow(new TestException()).when(mockActorContext). + executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration()); - Assert.assertNotNull(messages); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY, schemaContext); + + try { + transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); + fail("Expected ReadFailedException"); + } catch(ReadFailedException e) { + // Expected - throw cause - expects TestException. + throw e.getCause(); + } + } - Assert.assertTrue(messages instanceof List); + private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker) + throws Throwable { - List listMessages = (List) messages; + doThrow(exToThrow).when(mockActorContext).executeShardOperation( + anyString(), any(), anyDuration()); - Assert.assertEquals(1, listMessages.size()); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY, schemaContext); - Assert.assertEquals(WriteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass()); + try { + invoker.invoke(transactionProxy); + fail("Expected ReadFailedException"); + } catch(ReadFailedException e) { + // Expected - throw cause - expects TestException. + throw e.getCause(); + } } - private Object createPrimaryFound(ActorRef actorRef) { - return new PrimaryFound(actorRef.path().toString()).toSerializable(); + private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable { + testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() { + @Override + public void invoke(TransactionProxy proxy) throws Exception { + proxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); + } + }); } - @Test - public void testMerge() throws Exception { - final Props props = Props.create(MessageCollectorActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + @Test(expected = PrimaryNotFoundException.class) + public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable { + testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test")); + } - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef)); - actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); - actorContext.setExecuteRemoteOperationResponse("message"); + @Test(expected = TimeoutException.class) + public void testReadWhenATimeoutExceptionIsThrown() throws Throwable { + testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test", + new Exception("reason"))); + } - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + @Test(expected = TestException.class) + public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable { + testReadWithExceptionOnInitialCreateTransaction(new TestException()); + } - transactionProxy.merge(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.NAME_QNAME)); + @Test + public void testExists() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY); - Object messages = testContext - .executeLocalOperation(actorRef, "messages", - ActorContext.ASK_DURATION); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY, schemaContext); - Assert.assertNotNull(messages); + doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqDataExists(), anyDuration()); - Assert.assertTrue(messages instanceof List); + Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(); - List listMessages = (List) messages; + assertEquals("Exists response", false, exists); - Assert.assertEquals(1, listMessages.size()); + doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqDataExists(), anyDuration()); - Assert.assertEquals(MergeData.SERIALIZABLE_CLASS, listMessages.get(0).getClass()); + exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(); + + assertEquals("Exists response", true, exists); } - @Test - public void testDelete() throws Exception { - final Props props = Props.create(MessageCollectorActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + @Test(expected = PrimaryNotFoundException.class) + public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable { + testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() { + @Override + public void invoke(TransactionProxy proxy) throws Exception { + proxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); + } + }); + } - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef)); - actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); - actorContext.setExecuteRemoteOperationResponse("message"); + @Test(expected = ReadFailedException.class) + public void testExistsWhenAnInvalidMessageIsSentInReply() throws Exception { + setupActorContextWithInitialCreateTransaction(READ_ONLY); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + doReturn(Futures.successful(new Object())).when(mockActorContext). + executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration()); - transactionProxy.delete(TestModel.TEST_PATH); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY, schemaContext); - Object messages = testContext - .executeLocalOperation(actorRef, "messages", - ActorContext.ASK_DURATION); - - Assert.assertNotNull(messages); + transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); + } - Assert.assertTrue(messages instanceof List); + @Test(expected = TestException.class) + public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable { + setupActorContextWithInitialCreateTransaction(READ_ONLY); - List listMessages = (List) messages; + doThrow(new TestException()).when(mockActorContext). + executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration()); - Assert.assertEquals(1, listMessages.size()); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY, schemaContext); - Assert.assertEquals(DeleteData.SERIALIZABLE_CLASS, listMessages.get(0).getClass()); + try { + transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); + fail("Expected ReadFailedException"); + } catch(ReadFailedException e) { + // Expected - throw cause - expects TestException. + throw e.getCause(); + } } @Test - public void testReady() throws Exception { - final Props props = Props.create(DoNothingActor.class); - final ActorRef doNothingActorRef = getSystem().actorOf(props); + public void testWrite() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteLocalOperationResponse(createPrimaryFound(doNothingActorRef)); - actorContext.setExecuteShardOperationResponse(createTransactionReply(doNothingActorRef)); - actorContext.setExecuteRemoteOperationResponse(new ReadyTransactionReply(doNothingActorRef.path()).toSerializable()); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY, schemaContext); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); - transactionProxy.read(TestModel.TEST_PATH); + verify(mockActorContext).sendRemoteOperationAsync( + eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); + } - DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + @Test + public void testMerge() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); - Assert.assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY, schemaContext); - ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - Assert.assertTrue("No cohort paths returned", proxy.getCohortPaths().size() > 0); + transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); + verify(mockActorContext).sendRemoteOperationAsync( + eq(actorSelection(actorRef)), eqMergeData(nodeToWrite)); } @Test - public void testGetIdentifier(){ - final Props props = Props.create(DoNothingActor.class); - final ActorRef doNothingActorRef = getSystem().actorOf(props); + public void testDelete() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse( createTransactionReply(doNothingActorRef) ); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY, schemaContext); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + transactionProxy.delete(TestModel.TEST_PATH); - Assert.assertNotNull(transactionProxy.getIdentifier()); + verify(mockActorContext).sendRemoteOperationAsync( + eq(actorSelection(actorRef)), eqDeleteData()); } + @SuppressWarnings("unchecked") @Test - public void testClose(){ - final Props props = Props.create(MessageCollectorActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + public void testReady() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE); - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteLocalOperationResponse(createPrimaryFound(actorRef)); - actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); - actorContext.setExecuteRemoteOperationResponse("message"); + doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqReadData(), anyDuration()); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor, TestModel.createTestContext()); + doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperation( + eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_WRITE, schemaContext); transactionProxy.read(TestModel.TEST_PATH); - transactionProxy.close(); + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + assertEquals("getCohortPaths", Arrays.asList(actorRef.path()), proxy.getCohortPaths()); + } - Object messages = testContext - .executeLocalOperation(actorRef, "messages", - ActorContext.ASK_DURATION); + @Test + public void testGetIdentifier() { + setupActorContextWithInitialCreateTransaction(READ_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + TransactionProxy.TransactionType.READ_ONLY, schemaContext); + + Object id = transactionProxy.getIdentifier(); + assertNotNull("getIdentifier returned null", id); + assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName)); + } - Assert.assertNotNull(messages); + @SuppressWarnings("unchecked") + @Test + public void testClose() throws Exception{ + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE); - Assert.assertTrue(messages instanceof List); + doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqReadData(), anyDuration()); - List listMessages = (List) messages; + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_WRITE, schemaContext); - Assert.assertEquals(1, listMessages.size()); + transactionProxy.read(TestModel.TEST_PATH); - Assert.assertTrue(listMessages.get(0).getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)); - } + transactionProxy.close(); - private CreateTransactionReply createTransactionReply(ActorRef actorRef){ - return CreateTransactionReply.newBuilder() - .setTransactionActorPath(actorRef.path().toString()) - .setTransactionId("txn-1") - .build(); + verify(mockActorContext).sendRemoteOperationAsync( + eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS)); } }