X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxyTest.java;h=14696f786e7e36888be3b5517c14ae4b9779340f;hp=8b70e00da923d92a806c59530c2111b5cfe87b7d;hb=30faeb35260541c273a81b8f126b40da94daa825;hpb=dae79bb685addd04c5745bd056b147a47ec9773f diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index 8b70e00da9..14696f786e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -1,256 +1,478 @@ package org.opendaylight.controller.cluster.datastore; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; +import akka.actor.ActorPath; import akka.actor.ActorRef; +import akka.actor.ActorSelection; import akka.actor.Props; +import akka.dispatch.Futures; import com.google.common.base.Optional; -import com.google.common.util.concurrent.ListenableFuture; -import junit.framework.Assert; + +import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentMatcher; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY; +import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY; +import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE; + +import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType; +import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; +import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; -import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; +import org.opendaylight.controller.cluster.datastore.messages.DataExists; +import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; import org.opendaylight.controller.cluster.datastore.messages.DeleteData; import org.opendaylight.controller.cluster.datastore.messages.MergeData; +import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.WriteData; +import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; +import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; -import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; -import org.opendaylight.controller.cluster.datastore.utils.MockActorContext; +import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; + +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; -import java.util.List; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.argThat; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.isA; +@SuppressWarnings("resource") public class TransactionProxyTest extends AbstractActorTest { - @Test - public void testRead() throws Exception { - final Props props = Props.create(DoNothingActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + @SuppressWarnings("serial") + static class TestException extends RuntimeException { + } + + static interface Invoker { + void invoke(TransactionProxy proxy) throws Exception; + } - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path())); - actorContext.setExecuteRemoteOperationResponse("message"); + private final Configuration configuration = new MockConfiguration(); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY); + @Mock + private ActorContext mockActorContext; + private SchemaContext schemaContext; - ListenableFuture>> read = - transactionProxy.read(TestModel.TEST_PATH); + String memberName = "mock-member"; - Optional> normalizedNodeOptional = read.get(); + @Before + public void setUp(){ + MockitoAnnotations.initMocks(this); - Assert.assertFalse(normalizedNodeOptional.isPresent()); + schemaContext = TestModel.createTestContext(); - actorContext.setExecuteRemoteOperationResponse(new ReadDataReply( - ImmutableNodes.containerNode(TestModel.TEST_QNAME))); + doReturn(getSystem()).when(mockActorContext).getActorSystem(); - read = transactionProxy.read(TestModel.TEST_PATH); + ShardStrategyFactory.setConfiguration(configuration); + } + + private CreateTransaction eqCreateTransaction(final String memberName, + final TransactionType type) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + CreateTransaction obj = CreateTransaction.fromSerializable(argument); + return obj.getTransactionId().startsWith(memberName) && + obj.getTransactionType() == type.ordinal(); + } + }; + + return argThat(matcher); + } - normalizedNodeOptional = read.get(); + private DataExists eqDataExists() { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + DataExists obj = DataExists.fromSerializable(argument); + return obj.getPath().equals(TestModel.TEST_PATH); + } + }; - Assert.assertTrue(normalizedNodeOptional.isPresent()); + return argThat(matcher); } - @Test - public void testReadWhenANullIsReturned() throws Exception { - final Props props = Props.create(DoNothingActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + private ReadData eqReadData() { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + ReadData obj = ReadData.fromSerializable(argument); + return obj.getPath().equals(TestModel.TEST_PATH); + } + }; + + return argThat(matcher); + } - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path())); - actorContext.setExecuteRemoteOperationResponse("message"); + private WriteData eqWriteData(final NormalizedNode nodeToWrite) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + WriteData obj = WriteData.fromSerializable(argument, schemaContext); + return obj.getPath().equals(TestModel.TEST_PATH) && + obj.getData().equals(nodeToWrite); + } + }; + + return argThat(matcher); + } - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY); + private MergeData eqMergeData(final NormalizedNode nodeToWrite) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + MergeData obj = MergeData.fromSerializable(argument, schemaContext); + return obj.getPath().equals(TestModel.TEST_PATH) && + obj.getData().equals(nodeToWrite); + } + }; + + return argThat(matcher); + } + private DeleteData eqDeleteData() { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + DeleteData obj = DeleteData.fromSerializable(argument); + return obj.getPath().equals(TestModel.TEST_PATH); + } + }; + + return argThat(matcher); + } - ListenableFuture>> read = - transactionProxy.read(TestModel.TEST_PATH); + private Object readyTxReply(ActorPath path) { + return new ReadyTransactionReply(path).toSerializable(); + } - Optional> normalizedNodeOptional = read.get(); + private Future readDataReply(NormalizedNode data) { + return Futures.successful(new ReadDataReply(schemaContext, data) + .toSerializable()); + } - Assert.assertFalse(normalizedNodeOptional.isPresent()); + private Future dataExistsReply(boolean exists) { + return Futures.successful(new DataExistsReply(exists).toSerializable()); + } - actorContext.setExecuteRemoteOperationResponse(new ReadDataReply( - null)); + private ActorSelection actorSelection(ActorRef actorRef) { + return getSystem().actorSelection(actorRef.path()); + } - read = transactionProxy.read(TestModel.TEST_PATH); + private FiniteDuration anyDuration() { + return any(FiniteDuration.class); + } - normalizedNodeOptional = read.get(); + private CreateTransactionReply createTransactionReply(ActorRef actorRef){ + return CreateTransactionReply.newBuilder() + .setTransactionActorPath(actorRef.path().toString()) + .setTransactionId("txn-1").build(); + } - Assert.assertFalse(normalizedNodeOptional.isPresent()); + private ActorRef setupActorContextWithInitialCreateTransaction(TransactionType type) { + ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class)); + doReturn(getSystem().actorSelection(actorRef.path())). + when(mockActorContext).actorSelection(actorRef.path().toString()); + doReturn(memberName).when(mockActorContext).getCurrentMemberName(); + doReturn(createTransactionReply(actorRef)).when(mockActorContext). + executeShardOperation(eq(DefaultShardStrategy.DEFAULT_SHARD), + eqCreateTransaction(memberName, type), anyDuration()); + doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath( + anyString(), eq(actorRef.path().toString())); + doReturn(actorRef.path()).when(mockActorContext).actorFor(actorRef.path().toString()); + + return actorRef; } @Test - public void testWrite() throws Exception { - final Props props = Props.create(MessageCollectorActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + public void testRead() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY); - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path())); - actorContext.setExecuteRemoteOperationResponse("message"); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY, schemaContext); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY); + doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqReadData(), anyDuration()); - transactionProxy.write(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.NAME_QNAME)); + Optional> readOptional = transactionProxy.read( + TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); - ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class))); - Object messages = testContext - .executeLocalOperation(actorRef, "messages", - ActorContext.ASK_DURATION); + assertEquals("NormalizedNode isPresent", false, readOptional.isPresent()); - Assert.assertNotNull(messages); + NormalizedNode expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - Assert.assertTrue(messages instanceof List); + doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqReadData(), anyDuration()); - List listMessages = (List) messages; + readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); - Assert.assertEquals(1, listMessages.size()); + assertEquals("NormalizedNode isPresent", true, readOptional.isPresent()); - Assert.assertTrue(listMessages.get(0) instanceof WriteData); + assertEquals("Response NormalizedNode", expectedNode, readOptional.get()); } - @Test - public void testMerge() throws Exception { - final Props props = Props.create(MessageCollectorActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + @Test(expected = ReadFailedException.class) + public void testReadWhenAnInvalidMessageIsSentInReply() throws Exception { + setupActorContextWithInitialCreateTransaction(READ_ONLY); - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path())); - actorContext.setExecuteRemoteOperationResponse("message"); + doReturn(Futures.successful(new Object())).when(mockActorContext). + executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration()); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY, schemaContext); - transactionProxy.merge(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.NAME_QNAME)); + transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); + } + + @Test(expected = TestException.class) + public void testReadWithAsyncRemoteOperatonFailure() throws Throwable { + setupActorContextWithInitialCreateTransaction(READ_ONLY); + + doThrow(new TestException()).when(mockActorContext). + executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY, schemaContext); + + try { + transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); + fail("Expected ReadFailedException"); + } catch(ReadFailedException e) { + // Expected - throw cause - expects TestException. + throw e.getCause(); + } + } - ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class))); - Object messages = testContext - .executeLocalOperation(actorRef, "messages", - ActorContext.ASK_DURATION); + private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker) + throws Throwable { - Assert.assertNotNull(messages); + doThrow(exToThrow).when(mockActorContext).executeShardOperation( + anyString(), any(), anyDuration()); - Assert.assertTrue(messages instanceof List); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY, schemaContext); - List listMessages = (List) messages; + try { + invoker.invoke(transactionProxy); + fail("Expected ReadFailedException"); + } catch(ReadFailedException e) { + // Expected - throw cause - expects TestException. + throw e.getCause(); + } + } - Assert.assertEquals(1, listMessages.size()); + private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable { + testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() { + @Override + public void invoke(TransactionProxy proxy) throws Exception { + proxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); + } + }); + } + + @Test(expected = PrimaryNotFoundException.class) + public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable { + testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test")); + } - Assert.assertTrue(listMessages.get(0) instanceof MergeData); + @Test(expected = TimeoutException.class) + public void testReadWhenATimeoutExceptionIsThrown() throws Throwable { + testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test", + new Exception("reason"))); + } + + @Test(expected = TestException.class) + public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable { + testReadWithExceptionOnInitialCreateTransaction(new TestException()); } @Test - public void testDelete() throws Exception { - final Props props = Props.create(MessageCollectorActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + public void testExists() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY); - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path())); - actorContext.setExecuteRemoteOperationResponse("message"); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY, schemaContext); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY); + doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqDataExists(), anyDuration()); - transactionProxy.delete(TestModel.TEST_PATH); + Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(); + + assertEquals("Exists response", false, exists); + + doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqDataExists(), anyDuration()); + + exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(); + + assertEquals("Exists response", true, exists); + } + + @Test(expected = PrimaryNotFoundException.class) + public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable { + testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() { + @Override + public void invoke(TransactionProxy proxy) throws Exception { + proxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); + } + }); + } + + @Test(expected = ReadFailedException.class) + public void testExistsWhenAnInvalidMessageIsSentInReply() throws Exception { + setupActorContextWithInitialCreateTransaction(READ_ONLY); - ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class))); - Object messages = testContext - .executeLocalOperation(actorRef, "messages", - ActorContext.ASK_DURATION); + doReturn(Futures.successful(new Object())).when(mockActorContext). + executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration()); - Assert.assertNotNull(messages); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY, schemaContext); - Assert.assertTrue(messages instanceof List); + transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); + } - List listMessages = (List) messages; + @Test(expected = TestException.class) + public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable { + setupActorContextWithInitialCreateTransaction(READ_ONLY); - Assert.assertEquals(1, listMessages.size()); + doThrow(new TestException()).when(mockActorContext). + executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration()); - Assert.assertTrue(listMessages.get(0) instanceof DeleteData); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY, schemaContext); + + try { + transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); + fail("Expected ReadFailedException"); + } catch(ReadFailedException e) { + // Expected - throw cause - expects TestException. + throw e.getCause(); + } } @Test - public void testReady() throws Exception { - final Props props = Props.create(DoNothingActor.class); - final ActorRef doNothingActorRef = getSystem().actorOf(props); + public void testWrite() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(doNothingActorRef.path())); - actorContext.setExecuteRemoteOperationResponse(new ReadyTransactionReply(doNothingActorRef.path())); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY, schemaContext); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY); + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); - DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + verify(mockActorContext).sendRemoteOperationAsync( + eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); + } - Assert.assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + @Test + public void testMerge() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); - ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY, schemaContext); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - Assert.assertTrue("No cohort paths returned", proxy.getCohortPaths().size() > 0); + transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); + verify(mockActorContext).sendRemoteOperationAsync( + eq(actorSelection(actorRef)), eqMergeData(nodeToWrite)); } @Test - public void testGetIdentifier(){ - final Props props = Props.create(DoNothingActor.class); - final ActorRef doNothingActorRef = getSystem().actorOf(props); + public void testDelete() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse( - new CreateTransactionReply(doNothingActorRef.path())); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY, schemaContext); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY); + transactionProxy.delete(TestModel.TEST_PATH); - Assert.assertNotNull(transactionProxy.getIdentifier()); + verify(mockActorContext).sendRemoteOperationAsync( + eq(actorSelection(actorRef)), eqDeleteData()); } + @SuppressWarnings("unchecked") @Test - public void testClose(){ - final Props props = Props.create(MessageCollectorActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + public void testReady() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE); - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path())); - actorContext.setExecuteRemoteOperationResponse("message"); + doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqReadData(), anyDuration()); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY); + doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperation( + eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration()); - transactionProxy.close(); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_WRITE, schemaContext); - ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class))); - Object messages = testContext - .executeLocalOperation(actorRef, "messages", - ActorContext.ASK_DURATION); + transactionProxy.read(TestModel.TEST_PATH); - Assert.assertNotNull(messages); + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - Assert.assertTrue(messages instanceof List); + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); - List listMessages = (List) messages; + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + assertEquals("getCohortPaths", Arrays.asList(actorRef.path()), proxy.getCohortPaths()); + } - Assert.assertEquals(1, listMessages.size()); + @Test + public void testGetIdentifier() { + setupActorContextWithInitialCreateTransaction(READ_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + TransactionProxy.TransactionType.READ_ONLY, schemaContext); + + Object id = transactionProxy.getIdentifier(); + assertNotNull("getIdentifier returned null", id); + assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName)); + } + + @SuppressWarnings("unchecked") + @Test + public void testClose() throws Exception{ + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE); + + doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync( + eq(actorSelection(actorRef)), eqReadData(), anyDuration()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_WRITE, schemaContext); + + transactionProxy.read(TestModel.TEST_PATH); + + transactionProxy.close(); - Assert.assertTrue(listMessages.get(0) instanceof CloseTransaction); + verify(mockActorContext).sendRemoteOperationAsync( + eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS)); } }