X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxyTest.java;h=7407897dfac504872e37d35c374397369a8c5576;hp=14696f786e7e36888be3b5517c14ae4b9779340f;hb=0133fc851489ac7ea5f0ca6413175fc8b7fa485b;hpb=aa83d1e4b013eecde3c79b00cca538bd4763c2d4 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index 14696f786e..7407897dfa 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -1,26 +1,23 @@ package org.opendaylight.controller.cluster.datastore; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.fail; -import akka.actor.ActorPath; import akka.actor.ActorRef; import akka.actor.ActorSelection; +import akka.actor.ActorSystem; import akka.actor.Props; import akka.dispatch.Futures; +import akka.testkit.JavaTestKit; import com.google.common.base.Optional; - +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.CheckedFuture; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.mockito.ArgumentMatcher; import org.mockito.Mock; import org.mockito.MockitoAnnotations; - -import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY; -import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY; -import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE; - import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException; @@ -29,12 +26,15 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; import org.opendaylight.controller.cluster.datastore.messages.DeleteData; +import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply; import org.opendaylight.controller.cluster.datastore.messages.MergeData; +import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply; import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.WriteData; +import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply; import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; @@ -47,33 +47,41 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.SchemaContext; - +import scala.concurrent.Await; import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - -import java.util.Arrays; +import scala.concurrent.duration.Duration; +import java.io.IOException; +import java.util.List; import java.util.concurrent.TimeUnit; - +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.argThat; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.eq; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.isA; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY; +import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_WRITE; +import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.WRITE_ONLY; @SuppressWarnings("resource") -public class TransactionProxyTest extends AbstractActorTest { +public class TransactionProxyTest { @SuppressWarnings("serial") static class TestException extends RuntimeException { } static interface Invoker { - void invoke(TransactionProxy proxy) throws Exception; + CheckedFuture invoke(TransactionProxy proxy) throws Exception; } + private static ActorSystem system; + private final Configuration configuration = new MockConfiguration(); @Mock @@ -81,19 +89,49 @@ public class TransactionProxyTest extends AbstractActorTest { private SchemaContext schemaContext; + @Mock + private ClusterWrapper mockClusterWrapper; + String memberName = "mock-member"; + @BeforeClass + public static void setUpClass() throws IOException { + + Config config = ConfigFactory.parseMap(ImmutableMap.builder(). + put("akka.actor.default-dispatcher.type", + "akka.testkit.CallingThreadDispatcherConfigurator").build()). + withFallback(ConfigFactory.load()); + system = ActorSystem.create("test", config); + } + + @AfterClass + public static void tearDownClass() throws IOException { + JavaTestKit.shutdownActorSystem(system); + system = null; + } + @Before public void setUp(){ MockitoAnnotations.initMocks(this); schemaContext = TestModel.createTestContext(); + DatastoreContext dataStoreContext = DatastoreContext.newBuilder().build(); + doReturn(getSystem()).when(mockActorContext).getActorSystem(); + doReturn(memberName).when(mockActorContext).getCurrentMemberName(); + doReturn(schemaContext).when(mockActorContext).getSchemaContext(); + doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper(); + doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper(); + doReturn(dataStoreContext).when(mockActorContext).getDatastoreContext(); ShardStrategyFactory.setConfiguration(configuration); } + private ActorSystem getSystem() { + return system; + } + private CreateTransaction eqCreateTransaction(final String memberName, final TransactionType type) { ArgumentMatcher matcher = new ArgumentMatcher() { @@ -108,12 +146,36 @@ public class TransactionProxyTest extends AbstractActorTest { return argThat(matcher); } + private DataExists eqSerializedDataExists() { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + return DataExists.SERIALIZABLE_CLASS.equals(argument.getClass()) && + DataExists.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH); + } + }; + + return argThat(matcher); + } + private DataExists eqDataExists() { ArgumentMatcher matcher = new ArgumentMatcher() { @Override public boolean matches(Object argument) { - DataExists obj = DataExists.fromSerializable(argument); - return obj.getPath().equals(TestModel.TEST_PATH); + return (argument instanceof DataExists) && + ((DataExists)argument).getPath().equals(TestModel.TEST_PATH); + } + }; + + return argThat(matcher); + } + + private ReadData eqSerializedReadData() { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + return ReadData.SERIALIZABLE_CLASS.equals(argument.getClass()) && + ReadData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH); } }; @@ -124,18 +186,22 @@ public class TransactionProxyTest extends AbstractActorTest { ArgumentMatcher matcher = new ArgumentMatcher() { @Override public boolean matches(Object argument) { - ReadData obj = ReadData.fromSerializable(argument); - return obj.getPath().equals(TestModel.TEST_PATH); + return (argument instanceof ReadData) && + ((ReadData)argument).getPath().equals(TestModel.TEST_PATH); } }; return argThat(matcher); } - private WriteData eqWriteData(final NormalizedNode nodeToWrite) { + private WriteData eqSerializedWriteData(final NormalizedNode nodeToWrite) { ArgumentMatcher matcher = new ArgumentMatcher() { @Override public boolean matches(Object argument) { + if(!WriteData.SERIALIZABLE_CLASS.equals(argument.getClass())) { + return false; + } + WriteData obj = WriteData.fromSerializable(argument, schemaContext); return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite); @@ -145,10 +211,30 @@ public class TransactionProxyTest extends AbstractActorTest { return argThat(matcher); } - private MergeData eqMergeData(final NormalizedNode nodeToWrite) { + private WriteData eqWriteData(final NormalizedNode nodeToWrite) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + if(argument instanceof WriteData) { + WriteData obj = (WriteData) argument; + return obj.getPath().equals(TestModel.TEST_PATH) && + obj.getData().equals(nodeToWrite); + } + return false; + } + }; + + return argThat(matcher); + } + + private MergeData eqSerializedMergeData(final NormalizedNode nodeToWrite) { ArgumentMatcher matcher = new ArgumentMatcher() { @Override public boolean matches(Object argument) { + if(!MergeData.SERIALIZABLE_CLASS.equals(argument.getClass())) { + return false; + } + MergeData obj = MergeData.fromSerializable(argument, schemaContext); return obj.getPath().equals(TestModel.TEST_PATH) && obj.getData().equals(nodeToWrite); @@ -158,69 +244,150 @@ public class TransactionProxyTest extends AbstractActorTest { return argThat(matcher); } - private DeleteData eqDeleteData() { + private MergeData eqMergeData(final NormalizedNode nodeToWrite) { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + if(argument instanceof MergeData) { + MergeData obj = ((MergeData) argument); + return obj.getPath().equals(TestModel.TEST_PATH) && + obj.getData().equals(nodeToWrite); + } + + return false; + } + }; + + return argThat(matcher); + } + + private DeleteData eqSerializedDeleteData() { ArgumentMatcher matcher = new ArgumentMatcher() { @Override public boolean matches(Object argument) { - DeleteData obj = DeleteData.fromSerializable(argument); - return obj.getPath().equals(TestModel.TEST_PATH); + return DeleteData.SERIALIZABLE_CLASS.equals(argument.getClass()) && + DeleteData.fromSerializable(argument).getPath().equals(TestModel.TEST_PATH); } }; return argThat(matcher); } - private Object readyTxReply(ActorPath path) { - return new ReadyTransactionReply(path).toSerializable(); + private DeleteData eqDeleteData() { + ArgumentMatcher matcher = new ArgumentMatcher() { + @Override + public boolean matches(Object argument) { + return argument instanceof DeleteData && + ((DeleteData)argument).getPath().equals(TestModel.TEST_PATH); + } + }; + + return argThat(matcher); + } + + private Future readySerializedTxReply(String path) { + return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable()); } - private Future readDataReply(NormalizedNode data) { - return Futures.successful(new ReadDataReply(schemaContext, data) - .toSerializable()); + private Future readyTxReply(String path) { + return Futures.successful((Object)new ReadyTransactionReply(path)); } - private Future dataExistsReply(boolean exists) { + + private Future readSerializedDataReply(NormalizedNode data) { + return Futures.successful(new ReadDataReply(schemaContext, data).toSerializable()); + } + + private Future readDataReply(NormalizedNode data) { + return Futures.successful(new ReadDataReply(schemaContext, data)); + } + + private Future dataExistsSerializedReply(boolean exists) { return Futures.successful(new DataExistsReply(exists).toSerializable()); } - private ActorSelection actorSelection(ActorRef actorRef) { - return getSystem().actorSelection(actorRef.path()); + private Future dataExistsReply(boolean exists) { + return Futures.successful(new DataExistsReply(exists)); + } + + private Future writeSerializedDataReply() { + return Futures.successful(new WriteDataReply().toSerializable()); + } + + private Future writeDataReply() { + return Futures.successful(new WriteDataReply()); + } + + private Future mergeSerializedDataReply() { + return Futures.successful(new MergeDataReply().toSerializable()); + } + + private Future mergeDataReply() { + return Futures.successful(new MergeDataReply()); + } + + private Future deleteSerializedDataReply() { + return Futures.successful(new DeleteDataReply().toSerializable()); + } + + private Future deleteDataReply() { + return Futures.successful(new DeleteDataReply()); } - private FiniteDuration anyDuration() { - return any(FiniteDuration.class); + private ActorSelection actorSelection(ActorRef actorRef) { + return getSystem().actorSelection(actorRef.path()); } - private CreateTransactionReply createTransactionReply(ActorRef actorRef){ + private CreateTransactionReply createTransactionReply(ActorRef actorRef, int transactionVersion){ return CreateTransactionReply.newBuilder() .setTransactionActorPath(actorRef.path().toString()) - .setTransactionId("txn-1").build(); + .setTransactionId("txn-1") + .setMessageVersion(transactionVersion) + .build(); } - private ActorRef setupActorContextWithInitialCreateTransaction(TransactionType type) { - ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class)); - doReturn(getSystem().actorSelection(actorRef.path())). + private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type, int transactionVersion) { + ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); + doReturn(actorSystem.actorSelection(actorRef.path())). when(mockActorContext).actorSelection(actorRef.path().toString()); - doReturn(memberName).when(mockActorContext).getCurrentMemberName(); - doReturn(createTransactionReply(actorRef)).when(mockActorContext). - executeShardOperation(eq(DefaultShardStrategy.DEFAULT_SHARD), - eqCreateTransaction(memberName, type), anyDuration()); - doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath( - anyString(), eq(actorRef.path().toString())); - doReturn(actorRef.path()).when(mockActorContext).actorFor(actorRef.path().toString()); + + doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))). + when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); + + doReturn(Futures.successful(createTransactionReply(actorRef, transactionVersion))).when(mockActorContext). + executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())), + eqCreateTransaction(memberName, type)); + + doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString()); return actorRef; } + private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) { + return setupActorContextWithInitialCreateTransaction(actorSystem, type, CreateTransaction.CURRENT_VERSION); + } + + + private void propagateReadFailedExceptionCause(CheckedFuture future) + throws Throwable { + + try { + future.checkedGet(5, TimeUnit.SECONDS); + fail("Expected ReadFailedException"); + } catch(ReadFailedException e) { + throw e.getCause(); + } + } + @Test public void testRead() throws Exception { - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_ONLY, schemaContext); + READ_ONLY); - doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqReadData(), anyDuration()); + doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData()); Optional> readOptional = transactionProxy.read( TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); @@ -229,8 +396,8 @@ public class TransactionProxyTest extends AbstractActorTest { NormalizedNode expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - doReturn(readDataReply(expectedNode)).when(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqReadData(), anyDuration()); + doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData()); readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); @@ -240,60 +407,55 @@ public class TransactionProxyTest extends AbstractActorTest { } @Test(expected = ReadFailedException.class) - public void testReadWhenAnInvalidMessageIsSentInReply() throws Exception { - setupActorContextWithInitialCreateTransaction(READ_ONLY); + public void testReadWithInvalidReplyMessageType() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); doReturn(Futures.successful(new Object())).when(mockActorContext). - executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration()); + executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_ONLY, schemaContext); + READ_ONLY); transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); } @Test(expected = TestException.class) public void testReadWithAsyncRemoteOperatonFailure() throws Throwable { - setupActorContextWithInitialCreateTransaction(READ_ONLY); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); - doThrow(new TestException()).when(mockActorContext). - executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration()); + doReturn(Futures.failed(new TestException())).when(mockActorContext). + executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_ONLY, schemaContext); + READ_ONLY); - try { - transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); - fail("Expected ReadFailedException"); - } catch(ReadFailedException e) { - // Expected - throw cause - expects TestException. - throw e.getCause(); - } + propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH)); } private void testExceptionOnInitialCreateTransaction(Exception exToThrow, Invoker invoker) throws Throwable { + ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class)); - doThrow(exToThrow).when(mockActorContext).executeShardOperation( - anyString(), any(), anyDuration()); + if (exToThrow instanceof PrimaryNotFoundException) { + doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString()); + } else { + doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))). + when(mockActorContext).findPrimaryShardAsync(anyString()); + } - TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_ONLY, schemaContext); + doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), any()); - try { - invoker.invoke(transactionProxy); - fail("Expected ReadFailedException"); - } catch(ReadFailedException e) { - // Expected - throw cause - expects TestException. - throw e.getCause(); - } + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); + + propagateReadFailedExceptionCause(invoker.invoke(transactionProxy)); } private void testReadWithExceptionOnInitialCreateTransaction(Exception exToThrow) throws Throwable { testExceptionOnInitialCreateTransaction(exToThrow, new Invoker() { @Override - public void invoke(TransactionProxy proxy) throws Exception { - proxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); + public CheckedFuture invoke(TransactionProxy proxy) throws Exception { + return proxy.read(TestModel.TEST_PATH); } }); } @@ -314,22 +476,104 @@ public class TransactionProxyTest extends AbstractActorTest { testReadWithExceptionOnInitialCreateTransaction(new TestException()); } + @Test(expected = TestException.class) + public void testReadWithPriorRecordingOperationFailure() throws Throwable { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); + + doReturn(Futures.failed(new TestException())).when(mockActorContext). + executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData()); + + doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_WRITE); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.delete(TestModel.TEST_PATH); + + try { + propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH)); + } finally { + verify(mockActorContext, times(0)).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData()); + } + } + + @Test + public void testReadWithPriorRecordingOperationSuccessful() throws Throwable { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); + + NormalizedNode expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedWriteData(expectedNode)); + + doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_WRITE); + + transactionProxy.write(TestModel.TEST_PATH, expectedNode); + + Optional> readOptional = transactionProxy.read( + TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); + + assertEquals("NormalizedNode isPresent", true, readOptional.isPresent()); + + assertEquals("Response NormalizedNode", expectedNode, readOptional.get()); + } + + @Test(expected=IllegalStateException.class) + public void testReadPreConditionCheck() { + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); + + transactionProxy.read(TestModel.TEST_PATH); + } + + @Test(expected=IllegalArgumentException.class) + public void testInvalidCreateTransactionReply() throws Throwable { + ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class)); + + doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext). + actorSelection(actorRef.path().toString()); + + doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))). + when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); + + doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync( + eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY)); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY); + + propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH)); + } + @Test public void testExists() throws Exception { - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_ONLY, schemaContext); + READ_ONLY); - doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqDataExists(), anyDuration()); + doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedDataExists()); Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(); assertEquals("Exists response", false, exists); - doReturn(dataExistsReply(true)).when(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqDataExists(), anyDuration()); + doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedDataExists()); exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(); @@ -340,139 +584,584 @@ public class TransactionProxyTest extends AbstractActorTest { public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable { testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"), new Invoker() { @Override - public void invoke(TransactionProxy proxy) throws Exception { - proxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); + public CheckedFuture invoke(TransactionProxy proxy) throws Exception { + return proxy.exists(TestModel.TEST_PATH); } }); } @Test(expected = ReadFailedException.class) - public void testExistsWhenAnInvalidMessageIsSentInReply() throws Exception { - setupActorContextWithInitialCreateTransaction(READ_ONLY); + public void testExistsWithInvalidReplyMessageType() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); doReturn(Futures.successful(new Object())).when(mockActorContext). - executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration()); + executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_ONLY, schemaContext); + READ_ONLY); transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); } @Test(expected = TestException.class) public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable { - setupActorContextWithInitialCreateTransaction(READ_ONLY); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); - doThrow(new TestException()).when(mockActorContext). - executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration()); + doReturn(Futures.failed(new TestException())).when(mockActorContext). + executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_ONLY, schemaContext); + READ_ONLY); + + propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH)); + } + + @Test(expected = TestException.class) + public void testExistsWithPriorRecordingOperationFailure() throws Throwable { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); + + doReturn(Futures.failed(new TestException())).when(mockActorContext). + executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDeleteData()); + + doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedDataExists()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_WRITE); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.delete(TestModel.TEST_PATH); try { - transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS); - fail("Expected ReadFailedException"); - } catch(ReadFailedException e) { - // Expected - throw cause - expects TestException. - throw e.getCause(); + propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH)); + } finally { + verify(mockActorContext, times(0)).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedDataExists()); } } @Test - public void testWrite() throws Exception { - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); + public void testExistsWithPriorRecordingOperationSuccessful() throws Throwable { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); + + doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedDataExists()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_WRITE); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(); + + assertEquals("Exists response", true, exists); + } + + @Test(expected=IllegalStateException.class) + public void testExistsPreConditionCheck() { TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - WRITE_ONLY, schemaContext); + WRITE_ONLY); + + transactionProxy.exists(TestModel.TEST_PATH); + } + + private void verifyRecordingOperationFutures(List> futures, + Class... expResultTypes) throws Exception { + assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size()); + + int i = 0; + for( Future future: futures) { + assertNotNull("Recording operation Future is null", future); + + Class expResultType = expResultTypes[i++]; + if(Throwable.class.isAssignableFrom(expResultType)) { + try { + Await.result(future, Duration.create(5, TimeUnit.SECONDS)); + fail("Expected exception from recording operation Future"); + } catch(Exception e) { + // Expected + } + } else { + assertEquals("Recording operation Future result type", expResultType, + Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass()); + } + } + } + + @Test + public void testWrite() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); - verify(mockActorContext).sendRemoteOperationAsync( - eq(actorSelection(actorRef)), eqWriteData(nodeToWrite)); + verify(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + WriteDataReply.SERIALIZABLE_CLASS); } - @Test - public void testMerge() throws Exception { - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); + @Test(expected=IllegalStateException.class) + public void testWritePreConditionCheck() { + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_ONLY); + + transactionProxy.write(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + } + + @Test(expected=IllegalStateException.class) + public void testWriteAfterReadyPreConditionCheck() { TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - WRITE_ONLY, schemaContext); + WRITE_ONLY); + + transactionProxy.ready(); + + transactionProxy.write(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + } + + @Test + public void testMerge() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite)); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); + transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); - verify(mockActorContext).sendRemoteOperationAsync( - eq(actorSelection(actorRef)), eqMergeData(nodeToWrite)); + verify(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite)); + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + MergeDataReply.SERIALIZABLE_CLASS); } @Test public void testDelete() throws Exception { - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); + + doReturn(deleteSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedDeleteData()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - WRITE_ONLY, schemaContext); + WRITE_ONLY); transactionProxy.delete(TestModel.TEST_PATH); - verify(mockActorContext).sendRemoteOperationAsync( - eq(actorSelection(actorRef)), eqDeleteData()); + verify(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedDeleteData()); + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + DeleteDataReply.SERIALIZABLE_CLASS); + } + + private void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy, + Object... expReplies) throws Exception { + assertEquals("getReadyOperationFutures size", expReplies.length, + proxy.getCohortFutures().size()); + + int i = 0; + for( Future future: proxy.getCohortFutures()) { + assertNotNull("Ready operation Future is null", future); + + Object expReply = expReplies[i++]; + if(expReply instanceof ActorSelection) { + ActorSelection actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS)); + assertEquals("Cohort actor path", expReply, actual); + } else { + // Expecting exception. + try { + Await.result(future, Duration.create(5, TimeUnit.SECONDS)); + fail("Expected exception from ready operation Future"); + } catch(Exception e) { + // Expected + } + } + } } - @SuppressWarnings("unchecked") @Test public void testReady() throws Exception { - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData()); - doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqReadData(), anyDuration()); + doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); - doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeRemoteOperation( - eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration()); + doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_WRITE, schemaContext); + READ_WRITE); transactionProxy.read(TestModel.TEST_PATH); + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + WriteDataReply.SERIALIZABLE_CLASS); + + verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + } + + @Test + public void testReadyForwardCompatibility() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, 0); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData()); + + doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); + + doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); + + doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()), + eq(actorRef.path().toString())); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_WRITE); + + transactionProxy.read(TestModel.TEST_PATH); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + WriteDataReply.SERIALIZABLE_CLASS); + + verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + + verify(mockActorContext).resolvePath(eq(actorRef.path().toString()), + eq(actorRef.path().toString())); + } + + @Test + public void testReadyWithRecordingOperationFailure() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite)); + + doReturn(Futures.failed(new TestException())).when(mockActorContext). + executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); + + doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); + + doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); + + transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyCohortFutures(proxy, TestException.class); + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + MergeDataReply.SERIALIZABLE_CLASS, TestException.class); + } + + @Test + public void testReadyWithReplyFailure() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(mergeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedMergeData(nodeToWrite)); + + doReturn(Futures.failed(new TestException())).when(mockActorContext). + executeOperationAsync(eq(actorSelection(actorRef)), + isA(ReadyTransaction.SERIALIZABLE_CLASS)); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); + + transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + MergeDataReply.SERIALIZABLE_CLASS); + + verifyCohortFutures(proxy, TestException.class); + } + + @Test + public void testReadyWithInitialCreateTransactionFailure() throws Exception { + + doReturn(Futures.failed(new PrimaryNotFoundException("mock"))).when( + mockActorContext).findPrimaryShardAsync(anyString()); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + transactionProxy.delete(TestModel.TEST_PATH); + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); assertTrue(ready instanceof ThreePhaseCommitCohortProxy); ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; - assertEquals("getCohortPaths", Arrays.asList(actorRef.path()), proxy.getCohortPaths()); + verifyCohortFutures(proxy, PrimaryNotFoundException.class); + } + + @Test + public void testReadyWithInvalidReplyMessageType() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); + + doReturn(Futures.successful(new Object())).when(mockActorContext). + executeOperationAsync(eq(actorSelection(actorRef)), + isA(ReadyTransaction.SERIALIZABLE_CLASS)); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + WRITE_ONLY); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyCohortFutures(proxy, IllegalArgumentException.class); } @Test public void testGetIdentifier() { - setupActorContextWithInitialCreateTransaction(READ_ONLY); + setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - TransactionProxy.TransactionType.READ_ONLY, schemaContext); + TransactionProxy.TransactionType.READ_ONLY); Object id = transactionProxy.getIdentifier(); assertNotNull("getIdentifier returned null", id); assertTrue("Invalid identifier: " + id, id.toString().startsWith(memberName)); } - @SuppressWarnings("unchecked") @Test public void testClose() throws Exception{ - ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_WRITE); + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE); - doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync( - eq(actorSelection(actorRef)), eqReadData(), anyDuration()); + doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData()); TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, - READ_WRITE, schemaContext); + READ_WRITE); transactionProxy.read(TestModel.TEST_PATH); transactionProxy.close(); - verify(mockActorContext).sendRemoteOperationAsync( + verify(mockActorContext).sendOperationAsync( eq(actorSelection(actorRef)), isA(CloseTransaction.SERIALIZABLE_CLASS)); } + + + /** + * Method to test a local Tx actor. The Tx paths are matched to decide if the + * Tx actor is local or not. This is done by mocking the Tx actor path + * and the caller paths and ensuring that the paths have the remote-address format + * + * Note: Since the default akka provider for test is not a RemoteActorRefProvider, + * the paths returned for the actors for all the tests are not qualified remote paths. + * Hence are treated as non-local/remote actors. In short, all tests except + * few below run for remote actors + * + * @throws Exception + */ + @Test + public void testLocalTxActorRead() throws Exception { + ActorSystem actorSystem = getSystem(); + ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); + + doReturn(actorSystem.actorSelection(shardActorRef.path())). + when(mockActorContext).actorSelection(shardActorRef.path().toString()); + + doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))). + when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); + + String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor"; + CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder() + .setTransactionId("txn-1") + .setTransactionActorPath(actorPath) + .build(); + + doReturn(Futures.successful(createTransactionReply)).when(mockActorContext). + executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), + eqCreateTransaction(memberName, READ_ONLY)); + + doReturn(true).when(mockActorContext).isPathLocal(actorPath); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY); + + // negative test case with null as the reply + doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqReadData()); + + Optional> readOptional = transactionProxy.read( + TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); + + assertEquals("NormalizedNode isPresent", false, readOptional.isPresent()); + + // test case with node as read data reply + NormalizedNode expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqReadData()); + + readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS); + + assertEquals("NormalizedNode isPresent", true, readOptional.isPresent()); + + assertEquals("Response NormalizedNode", expectedNode, readOptional.get()); + + // test for local data exists + doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqDataExists()); + + boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet(); + + assertEquals("Exists response", true, exists); + } + + @Test + public void testLocalTxActorWrite() throws Exception { + ActorSystem actorSystem = getSystem(); + ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); + + doReturn(actorSystem.actorSelection(shardActorRef.path())). + when(mockActorContext).actorSelection(shardActorRef.path().toString()); + + doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))). + when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); + + String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor"; + CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder() + .setTransactionId("txn-1") + .setTransactionActorPath(actorPath) + .build(); + + doReturn(Futures.successful(createTransactionReply)).when(mockActorContext). + executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())), + eqCreateTransaction(memberName, WRITE_ONLY)); + + doReturn(true).when(mockActorContext).isPathLocal(actorPath); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqWriteData(nodeToWrite)); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY); + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + verify(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqWriteData(nodeToWrite)); + + //testing local merge + doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqMergeData(nodeToWrite)); + + transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite); + + verify(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqMergeData(nodeToWrite)); + + + //testing local delete + doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), eqDeleteData()); + + transactionProxy.delete(TestModel.TEST_PATH); + + verify(mockActorContext).executeOperationAsync(any(ActorSelection.class), eqDeleteData()); + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + WriteDataReply.class, MergeDataReply.class, DeleteDataReply.class); + + // testing ready + doReturn(readyTxReply(shardActorRef.path().toString())).when(mockActorContext).executeOperationAsync( + any(ActorSelection.class), isA(ReadyTransaction.class)); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyCohortFutures(proxy, getSystem().actorSelection(shardActorRef.path())); + } }