X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxyTest.java;h=f2b849122a3c0fced5f8925b0ab6d7e80080152c;hp=a8df49f5ca4514ceb9ea1dfc25809cf789647150;hb=73e969cf365dd78772596c71e940ae44fe2f22d3;hpb=cbe83ca3074fa0182d4f079f528bb710a997ced7 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index a8df49f5ca..f2b849122a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -1,264 +1,1075 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.ActorSystem; import akka.actor.Props; +import akka.dispatch.Futures; import com.google.common.base.Optional; -import com.google.common.util.concurrent.ListenableFuture; -import junit.framework.Assert; +import com.google.common.util.concurrent.CheckedFuture; +import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentMatcher; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType; +import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; +import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; -import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; +import org.opendaylight.controller.cluster.datastore.messages.DataExists; +import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; import org.opendaylight.controller.cluster.datastore.messages.DeleteData; +import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply; import org.opendaylight.controller.cluster.datastore.messages.MergeData; +import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply; +import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.WriteData; +import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply; +import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; +import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; -import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; -import org.opendaylight.controller.cluster.datastore.utils.MockActorContext; +import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; - +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - +import java.util.concurrent.TimeUnit; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.argThat; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.isA; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY; +import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE; +import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY; + +@SuppressWarnings("resource") public class TransactionProxyTest extends AbstractActorTest { - private ExecutorService transactionExecutor = - Executors.newSingleThreadExecutor(); + @SuppressWarnings("serial") + static class TestException extends RuntimeException { + } + + static interface Invoker { + CheckedFuture invoke(TransactionProxy proxy) throws Exception; + } + + private final Configuration configuration = new MockConfiguration(); + + @Mock + private ActorContext mockActorContext; + + private SchemaContext schemaContext; + + @Mock + private ClusterWrapper mockClusterWrapper; + + String memberName = "mock-member"; + + @Before + public void setUp(){ + MockitoAnnotations.initMocks(this); + + schemaContext = TestModel.createTestContext(); + + doReturn(getSystem()).when(mockActorContext).getActorSystem(); + doReturn(memberName).when(mockActorContext).getCurrentMemberName(); + doReturn(schemaContext).when(mockActorContext).getSchemaContext(); + doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper(); + + ShardStrategyFactory.setConfiguration(configuration); + } + + private CreateTransaction eqCreateTransaction(final String memberName, + final TransactionType type) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + CreateTransaction obj = CreateTransaction.fromSerializable(argument); + return obj.getTransactionId().startsWith(memberName) && + obj.getTransactionType() == type.ordinal(); + } + }; + + return argThat(matcher); + } + + private DataExists eqSerializedDataExists() { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) && + DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH); + } + }; + + return argThat(matcher); + } + + private DataExists eqDataExists() { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + return (argument instanceof DataExists) && + ((DataExists)argument).getPath().equals(TestModel.TEST_PATH); + } + }; + + return argThat(matcher); + } + + private ReadData eqSerializedReadData() { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) && + ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH); + } + }; + + return argThat(matcher); + } + + private ReadData eqReadData() { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + return (argument instanceof ReadData) && + ((ReadData)argument).getPath().equals(TestModel.TEST_PATH); + } + }; + + return argThat(matcher); + } + + private WriteData eqSerializedWriteData(final NormalizedNode nodeToWrite) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + if(!WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) { + return false; + } + + WriteData obj = WriteData.fromSerializable(argument, schemaContext); + return obj.getPath().equals(TestModel.TEST_PATH) && + obj.getData().equals(nodeToWrite); + } + }; + + return argThat(matcher); + } + + private WriteData eqWriteData(final NormalizedNode nodeToWrite) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + if(argument instanceof WriteData) { + WriteData obj = (WriteData) argument; + return obj.getPath().equals(TestModel.TEST_PATH) && + obj.getData().equals(nodeToWrite); + } + return false; + } + }; + + return argThat(matcher); + } + + private MergeData eqSerializedMergeData(final NormalizedNode nodeToWrite) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + if(!MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) { + return false; + } + + MergeData obj = MergeData.fromSerializable(argument, schemaContext); + return obj.getPath().equals(TestModel.TEST_PATH) && + obj.getData().equals(nodeToWrite); + } + }; + + return argThat(matcher); + } + + private MergeData eqMergeData(final NormalizedNode nodeToWrite) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + if(argument instanceof MergeData) { + MergeData obj = ((MergeData) argument); + return obj.getPath().equals(TestModel.TEST_PATH) && + obj.getData().equals(nodeToWrite); + } + + return false; + } + }; + + return argThat(matcher); + } + + private DeleteData eqSerializedDeleteData() { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) && + DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH); + } + }; + + return argThat(matcher); + } + + private DeleteData eqDeleteData() { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + return argument instanceof DeleteData && + ((DeleteData)argument).getPath().equals(TestModel.TEST_PATH); + } + }; + + return argThat(matcher); + } + + private Future readySerializedTxReply(String path) { + return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable()); + } + + private Future readyTxReply(String path) { + return Futures.successful((Object)new ReadyTransactionReply(path)); + } + + + private Future readSerializedDataReply(NormalizedNode data) { + return Futures.successful(new ReadDataReply(schemaContext, data).toSerializable()); + } + + private Future readDataReply(NormalizedNode data) { + return Futures.successful(new ReadDataReply(schemaContext, data)); + } + + private Future dataExistsSerializedReply(boolean exists) { + return Futures.successful(new DataExistsReply(exists).toSerializable()); + } + + private Future dataExistsReply(boolean exists) { + return Futures.successful(new DataExistsReply(exists)); + } + + private Future writeSerializedDataReply() { + return Futures.successful(new WriteDataReply().toSerializable()); + } + + private Future writeDataReply() { + return Futures.successful(new WriteDataReply()); + } + + private Future mergeSerializedDataReply() { + return Futures.successful(new MergeDataReply().toSerializable()); + } + + private Future mergeDataReply() { + return Futures.successful(new MergeDataReply()); + } + + private Future deleteSerializedDataReply() { + return Futures.successful(new DeleteDataReply().toSerializable()); + } + + private Future deleteDataReply() { + return Futures.successful(new DeleteDataReply()); + } + + private ActorSelection actorSelection(ActorRef actorRef) { + return getSystem().actorSelection(actorRef.path()); + } + + private CreateTransactionReply createTransactionReply(ActorRef actorRef){ + return CreateTransactionReply.newBuilder() + .setTransactionActorPath(actorRef.path().toString()) + .setTransactionId("txn-1").build(); + } + + private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) { + ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); + doReturn(actorSystem.actorSelection(actorRef.path())). + when(mockActorContext).actorSelection(actorRef.path().toString()); + + doReturn(Optional.of(actorSystem.actorSelection(actorRef.path()))). + when(mockActorContext).findPrimaryShard(eq(DefaultShardStrategy.DEFAULT_SHARD)); + + doReturn(createTransactionReply(actorRef)).when(mockActorContext). + executeOperation(eq(actorSystem.actorSelection(actorRef.path())), + eqCreateTransaction(memberName, type)); + + doReturn(false).when(mockActorContext).isLocalPath(actorRef.path().toString()); + + return actorRef; + } + + private void propagateReadFailedExceptionCause(CheckedFuture future) + throws Throwable { + + try { + future.checkedGet(5, TimeUnit.SECONDS); + fail("Expected ReadFailedException"); + } catch(ReadFailedException e) { + throw e.getCause(); + } + } @Test public void testRead() throws Exception { - final Props props = Props.create(DoNothingActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY); + + doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData()); + + Optional> readOptional = transactionProxy.read( + TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); + + assertEquals("NormalizedNode isPresent", false, readOptional.isPresent()); + + NormalizedNode expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData()); + + readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); - actorContext.setExecuteRemoteOperationResponse("message"); + assertEquals("NormalizedNode isPresent", true, readOptional.isPresent()); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor); + assertEquals("Response NormalizedNode", expectedNode, readOptional.get()); + } + + @Test(expected = ReadFailedException.class) + public void testReadWithInvalidReplyMessageType() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); + + doReturn(Futures.successful(new Object())).when(mockActorContext). + executeOperationAsync(any(ActorSelection.class), any()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY); + + transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); + } + + @Test(expected = TestException.class) + public void testReadWithAsyncRemoteOperatonFailure() throws Throwable { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); + + doReturn(Futures.failed(new TestException())).when(mockActorContext). + executeOperationAsync(any(ActorSelection.class), any()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY); + propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH)); + } - ListenableFuture>> read = - transactionProxy.read(TestModel.TEST_PATH); + private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker) + throws Throwable { + ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class)); - Optional> normalizedNodeOptional = read.get(); + if (exToThrow instanceof PrimaryNotFoundException) { + doReturn(Optional.absent()).when(mockActorContext).findPrimaryShard(anyString()); + } else { + doReturn(Optional.of(getSystem().actorSelection(actorRef.path()))). + when(mockActorContext).findPrimaryShard(anyString()); + } + doThrow(exToThrow).when(mockActorContext).executeOperation(any(ActorSelection.class), any()); - Assert.assertFalse(normalizedNodeOptional.isPresent()); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); - actorContext.setExecuteRemoteOperationResponse(new ReadDataReply( - ImmutableNodes.containerNode(TestModel.TEST_QNAME))); + propagateReadFailedExceptionCause(invoker.invoke(transactionProxy)); + } - read = transactionProxy.read(TestModel.TEST_PATH); + private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable { + testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() { + @Override + public CheckedFuture invoke(TransactionProxy proxy) throws Exception { + return proxy.read(TestModel.TEST_PATH); + } + }); + } - normalizedNodeOptional = read.get(); + @Test(expected = PrimaryNotFoundException.class) + public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable { + testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test")); + } - Assert.assertTrue(normalizedNodeOptional.isPresent()); + @Test(expected = TimeoutException.class) + public void testReadWhenATimeoutExceptionIsThrown() throws Throwable { + testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test", + new Exception("reason"))); + } + + @Test(expected = TestException.class) + public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable { + testReadWithExceptionOnInitialCreateTransaction(new TestException()); + } + + @Test(expected = TestException.class) + public void testReadWithPriorRecordingOperationFailure() throws Throwable { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); + + doReturn(Futures.failed(new TestException())).when(mockActorContext). + executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData()); + + doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_WRITE); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.delete(TestModel.TEST_PATH); + + try { + propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH)); + } finally { + verify(mockActorContext, times(0)).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData()); + } } @Test - public void testReadWhenANullIsReturned() throws Exception { - final Props props = Props.create(DoNothingActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + public void testReadWithPriorRecordingOperationSuccessful() throws Throwable { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); + + NormalizedNode expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); - actorContext.setExecuteRemoteOperationResponse("message"); + doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedWriteData(expectedNode)); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor); + doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData()); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_WRITE); - ListenableFuture>> read = - transactionProxy.read(TestModel.TEST_PATH); + transactionProxy.write(TestModel.TEST_PATH, expectedNode); - Optional> normalizedNodeOptional = read.get(); + Optional> readOptional = transactionProxy.read( + TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); - Assert.assertFalse(normalizedNodeOptional.isPresent()); + assertEquals("NormalizedNode isPresent", true, readOptional.isPresent()); - actorContext.setExecuteRemoteOperationResponse(new ReadDataReply( - null)); + assertEquals("Response NormalizedNode", expectedNode, readOptional.get()); + } - read = transactionProxy.read(TestModel.TEST_PATH); + @Test(expected=IllegalStateException.class) + public void testReadPreConditionCheck() { - normalizedNodeOptional = read.get(); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); - Assert.assertFalse(normalizedNodeOptional.isPresent()); + transactionProxy.read(TestModel.TEST_PATH); } @Test - public void testWrite() throws Exception { - final Props props = Props.create(MessageCollectorActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + public void testExists() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); - actorContext.setExecuteRemoteOperationResponse("message"); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor); + doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedDataExists()); - transactionProxy.write(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.NAME_QNAME)); + Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(); - ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class))); - Object messages = testContext - .executeLocalOperation(actorRef, "messages", - ActorContext.ASK_DURATION); + assertEquals("Exists response", false, exists); - Assert.assertNotNull(messages); + doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedDataExists()); - Assert.assertTrue(messages instanceof List); + exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(); - List listMessages = (List) messages; + assertEquals("Exists response", true, exists); + } - Assert.assertEquals(1, listMessages.size()); + @Test(expected = PrimaryNotFoundException.class) + public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable { + testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() { + @Override + public CheckedFuture invoke(TransactionProxy proxy) throws Exception { + return proxy.exists(TestModel.TEST_PATH); + } + }); + } - Assert.assertTrue(listMessages.get(0) instanceof WriteData); + @Test(expected = ReadFailedException.class) + public void testExistsWithInvalidReplyMessageType() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); + + doReturn(Futures.successful(new Object())).when(mockActorContext). + executeOperationAsync(any(ActorSelection.class), any()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY); + + transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); + } + + @Test(expected = TestException.class) + public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); + + doReturn(Futures.failed(new TestException())).when(mockActorContext). + executeOperationAsync(any(ActorSelection.class), any()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY); + + propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH)); + } + + @Test(expected = TestException.class) + public void testExistsWithPriorRecordingOperationFailure() throws Throwable { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); + + doReturn(Futures.failed(new TestException())).when(mockActorContext). + executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData()); + + doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedDataExists()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_WRITE); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.delete(TestModel.TEST_PATH); + + try { + propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH)); + } finally { + verify(mockActorContext, times(0)).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedDataExists()); + } } @Test - public void testMerge() throws Exception { - final Props props = Props.create(MessageCollectorActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); - actorContext.setExecuteRemoteOperationResponse("message"); + doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor); + doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedDataExists()); - transactionProxy.merge(TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.NAME_QNAME)); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_WRITE); - ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class))); - Object messages = testContext - .executeLocalOperation(actorRef, "messages", - ActorContext.ASK_DURATION); + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); - Assert.assertNotNull(messages); + Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(); - Assert.assertTrue(messages instanceof List); + assertEquals("Exists response", true, exists); + } + + @Test(expected=IllegalStateException.class) + public void testxistsPreConditionCheck() { + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); + + transactionProxy.exists(TestModel.TEST_PATH); + } + + private void verifyRecordingOperationFutures(List> futures, + Class... expResultTypes) throws Exception { + assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size()); + + int i = 0; + for( Future future: futures) { + assertNotNull("Recording operation Future is null", future); + + Class expResultType = expResultTypes[i++]; + if(Throwable.class.isAssignableFrom(expResultType)) { + try { + Await.result(future, Duration.create(5, TimeUnit.SECONDS)); + fail("Expected exception from recording operation Future"); + } catch(Exception e) { + // Expected + } + } else { + assertEquals("Recording operation Future result type", expResultType, + Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass()); + } + } + } + + @Test + public void testWrite() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + verify(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + WriteDataReply.SERIALIZABLE_CLASS); + } + + @Test(expected=IllegalStateException.class) + public void testWritePreConditionCheck() { - List listMessages = (List) messages; + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY); - Assert.assertEquals(1, listMessages.size()); + transactionProxy.write(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + } + + @Test(expected=IllegalStateException.class) + public void testWriteAfterReadyPreConditionCheck() { + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); + + transactionProxy.ready(); + + transactionProxy.write(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + } + + @Test + public void testMerge() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite)); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); - Assert.assertTrue(listMessages.get(0) instanceof MergeData); + transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); + + verify(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite)); + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + MergeDataReply.SERIALIZABLE_CLASS); } @Test public void testDelete() throws Exception { - final Props props = Props.create(MessageCollectorActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); - actorContext.setExecuteRemoteOperationResponse("message"); + doReturn(deleteSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedDeleteData()); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); transactionProxy.delete(TestModel.TEST_PATH); - ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class))); - Object messages = testContext - .executeLocalOperation(actorRef, "messages", - ActorContext.ASK_DURATION); + verify(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedDeleteData()); - Assert.assertNotNull(messages); + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + DeleteDataReply.SERIALIZABLE_CLASS); + } - Assert.assertTrue(messages instanceof List); + private void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy, + Object... expReplies) throws Exception { + assertEquals("getReadyOperationFutures size", expReplies.length, + proxy.getCohortFutures().size()); + + int i = 0; + for( Future future: proxy.getCohortFutures()) { + assertNotNull("Ready operation Future is null", future); + + Object expReply = expReplies[i++]; + if(expReply instanceof ActorSelection) { + ActorSelection actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS)); + assertEquals("Cohort actor path", expReply, actual); + } else { + // Expecting exception. + try { + Await.result(future, Duration.create(5, TimeUnit.SECONDS)); + fail("Expected exception from ready operation Future"); + } catch(Exception e) { + // Expected + } + } + } + } - List listMessages = (List) messages; + @SuppressWarnings("unchecked") + @Test + public void testReady() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData()); + + doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); + + doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_WRITE); + + transactionProxy.read(TestModel.TEST_PATH); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); - Assert.assertEquals(1, listMessages.size()); + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + WriteDataReply.SERIALIZABLE_CLASS); - Assert.assertTrue(listMessages.get(0) instanceof DeleteData); + verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); } + @SuppressWarnings("unchecked") @Test - public void testReady() throws Exception { - final Props props = Props.create(DoNothingActor.class); - final ActorRef doNothingActorRef = getSystem().actorOf(props); + public void testReadyWithRecordingOperationFailure() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite)); + + doReturn(Futures.failed(new TestException())).when(mockActorContext). + executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); + + doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); + + doReturn(false).when(mockActorContext).isLocalPath(actorRef.path().toString()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); + + transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + MergeDataReply.SERIALIZABLE_CLASS, TestException.class); + + verifyCohortFutures(proxy, TestException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void testReadyWithReplyFailure() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite)); - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse(createTransactionReply(doNothingActorRef)); - actorContext.setExecuteRemoteOperationResponse(new ReadyTransactionReply(doNothingActorRef.path())); + doReturn(Futures.failed(new TestException())).when(mockActorContext). + executeOperationAsync(eq(actorSelection(actorRef)), + isA(ReadyTransaction.SERIALIZABLE_CLASS)); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); + transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); - Assert.assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; - Assert.assertTrue("No cohort paths returned", proxy.getCohortPaths().size() > 0); + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + MergeDataReply.SERIALIZABLE_CLASS); + verifyCohortFutures(proxy, TestException.class); } @Test - public void testGetIdentifier(){ - final Props props = Props.create(DoNothingActor.class); - final ActorRef doNothingActorRef = getSystem().actorOf(props); + public void testReadyWithInitialCreateTransactionFailure() throws Exception { - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse( createTransactionReply(doNothingActorRef) ); + doReturn(Optional.absent()).when(mockActorContext).findPrimaryShard(anyString()); +// doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation( +// anyString(), any()); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); - Assert.assertNotNull(transactionProxy.getIdentifier()); + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.delete(TestModel.TEST_PATH); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyCohortFutures(proxy, PrimaryNotFoundException.class); + } + + @SuppressWarnings("unchecked") + @Test + public void testReadyWithInvalidReplyMessageType() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); + + doReturn(Futures.successful(new Object())).when(mockActorContext). + executeOperationAsync(eq(actorSelection(actorRef)), + isA(ReadyTransaction.SERIALIZABLE_CLASS)); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyCohortFutures(proxy, IllegalArgumentException.class); + } + + @Test + public void testGetIdentifier() { + setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + TransactionProxy.TransactionType.READ_ONLY); + + Object id = transactionProxy.getIdentifier(); + assertNotNull("getIdentifier returned null", id); + assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName)); } + @SuppressWarnings("unchecked") @Test - public void testClose(){ - final Props props = Props.create(MessageCollectorActor.class); - final ActorRef actorRef = getSystem().actorOf(props); + public void testClose() throws Exception{ + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); - final MockActorContext actorContext = new MockActorContext(this.getSystem()); - actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef)); - actorContext.setExecuteRemoteOperationResponse("message"); + doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData()); - TransactionProxy transactionProxy = - new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, transactionExecutor); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_WRITE); + + transactionProxy.read(TestModel.TEST_PATH); transactionProxy.close(); - ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class))); - Object messages = testContext - .executeLocalOperation(actorRef, "messages", - ActorContext.ASK_DURATION); + verify(mockActorContext).sendOperationAsync( + eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS)); + } + + + /** + * Method to test a local Tx actor. The Tx paths are matched to decide if the + * Tx actor is local or not. This is done by mocking the Tx actor path + * and the caller paths and ensuring that the paths have the remote-address format + * + * Note: Since the default akka provider for test is not a RemoteActorRefProvider, + * the paths returned for the actors for all the tests are not qualified remote paths. + * Hence are treated as non-local/remote actors. In short, all tests except + * few below run for remote actors + * + * @throws Exception + */ + @Test + public void testLocalTxActorRead() throws Exception { + ActorSystem actorSystem = getSystem(); + ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); + + doReturn(actorSystem.actorSelection(shardActorRef.path())). + when(mockActorContext).actorSelection(shardActorRef.path().toString()); + + doReturn(Optional.of(actorSystem.actorSelection(shardActorRef.path()))). + when(mockActorContext).findPrimaryShard(eq(DefaultShardStrategy.DEFAULT_SHARD)); + + String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor"; + CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder() + .setTransactionId("txn-1") + .setTransactionActorPath(actorPath) + .build(); + + doReturn(createTransactionReply).when(mockActorContext). + executeOperation(eq(actorSystem.actorSelection(shardActorRef.path())), + eqCreateTransaction(memberName, READ_ONLY)); - Assert.assertNotNull(messages); + doReturn(true).when(mockActorContext).isLocalPath(actorPath); - Assert.assertTrue(messages instanceof List); + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY); - List listMessages = (List) messages; + // negative test case with null as the reply + doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqReadData()); - Assert.assertEquals(1, listMessages.size()); + Optional> readOptional = transactionProxy.read( + TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); - Assert.assertTrue(listMessages.get(0) instanceof CloseTransaction); + assertEquals("NormalizedNode isPresent", false, readOptional.isPresent()); + + // test case with node as read data reply + NormalizedNode expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqReadData()); + + readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); + + assertEquals("NormalizedNode isPresent", true, readOptional.isPresent()); + + assertEquals("Response NormalizedNode", expectedNode, readOptional.get()); + + // test for local data exists + doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqDataExists()); + + boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(); + + assertEquals("Exists response", true, exists); } - private CreateTransactionReply createTransactionReply(ActorRef actorRef){ - return new CreateTransactionReply(actorRef.path(), "txn-1"); + @Test + public void testLocalTxActorWrite() throws Exception { + ActorSystem actorSystem = getSystem(); + ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); + + doReturn(actorSystem.actorSelection(shardActorRef.path())). + when(mockActorContext).actorSelection(shardActorRef.path().toString()); + + doReturn(Optional.of(actorSystem.actorSelection(shardActorRef.path()))). + when(mockActorContext).findPrimaryShard(eq(DefaultShardStrategy.DEFAULT_SHARD)); + + String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor"; + CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder() + .setTransactionId("txn-1") + .setTransactionActorPath(actorPath) + .build(); + + doReturn(createTransactionReply).when(mockActorContext). + executeOperation(eq(actorSystem.actorSelection(shardActorRef.path())), + eqCreateTransaction(memberName, WRITE_ONLY)); + + doReturn(true).when(mockActorContext).isLocalPath(actorPath); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqWriteData(nodeToWrite)); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + verify(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqWriteData(nodeToWrite)); + + //testing local merge + doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqMergeData(nodeToWrite)); + + transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); + + verify(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqMergeData(nodeToWrite)); + + + //testing local delete + doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqDeleteData()); + + transactionProxy.delete(TestModel.TEST_PATH); + + verify(mockActorContext).executeOperationAsync(any(ActorSelection.class), eqDeleteData()); + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + WriteDataReply.class, MergeDataReply.class, DeleteDataReply.class); + + // testing ready + doReturn(readyTxReply(shardActorRef.path().toString())).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), isA(ReadyTransaction.class)); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyCohortFutures(proxy, getSystem().actorSelection(shardActorRef.path())); } }