+
+
+ /**
+ * Method to test a local Tx actor. The Tx paths are matched to decide if the
+ * Tx actor is local or not. This is done by mocking the Tx actor path
+ * and the caller paths and ensuring that the paths have the remote-address format
+ *
+ * Note: Since the default akka provider for test is not a RemoteActorRefProvider,
+ * the paths returned for the actors for all the tests are not qualified remote paths.
+ * Hence are treated as non-local/remote actors. In short, all tests except
+ * few below run for remote actors
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testLocalTxActorRead() throws Exception {
+ ActorSystem actorSystem = getSystem();
+ ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+
+ doReturn(actorSystem.actorSelection(shardActorRef.path())).
+ when(mockActorContext).actorSelection(shardActorRef.path().toString());
+
+ doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
+ when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+
+ String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
+ CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
+ .setTransactionId("txn-1")
+ .setTransactionActorPath(actorPath)
+ .build();
+
+ doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
+ executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+ eqCreateTransaction(memberName, READ_ONLY));
+
+ doReturn(true).when(mockActorContext).isPathLocal(actorPath);
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY);
+
+ // negative test case with null as the reply
+ doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqReadData());
+
+ Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
+ TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
+
+ assertEquals("NormalizedNode isPresent", false, readOptional.isPresent());
+
+ // test case with node as read data reply
+ NormalizedNode<?, ?> expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqReadData());
+
+ readOptional = transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
+
+ assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
+
+ assertEquals("Response NormalizedNode", expectedNode, readOptional.get());
+
+ // test for local data exists
+ doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqDataExists());
+
+ boolean exists = transactionProxy.exists(TestModel.TEST_PATH).checkedGet();
+
+ assertEquals("Exists response", true, exists);
+ }
+
+ @Test
+ public void testLocalTxActorWrite() throws Exception {
+ ActorSystem actorSystem = getSystem();
+ ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+
+ doReturn(actorSystem.actorSelection(shardActorRef.path())).
+ when(mockActorContext).actorSelection(shardActorRef.path().toString());
+
+ doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
+ when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
+
+ String actorPath = "akka.tcp://system@127.0.0.1:2550/user/tx-actor";
+ CreateTransactionReply createTransactionReply = CreateTransactionReply.newBuilder()
+ .setTransactionId("txn-1")
+ .setTransactionActorPath(actorPath)
+ .build();
+
+ doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).
+ executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+ eqCreateTransaction(memberName, WRITE_ONLY));
+
+ doReturn(true).when(mockActorContext).isPathLocal(actorPath);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ verify(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqWriteData(nodeToWrite));
+
+ //testing local merge
+ doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqMergeData(nodeToWrite));
+
+ transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
+
+ verify(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqMergeData(nodeToWrite));
+
+
+ //testing local delete
+ doReturn(deleteDataReply()).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), eqDeleteData());
+
+ transactionProxy.delete(TestModel.TEST_PATH);
+
+ verify(mockActorContext).executeOperationAsync(any(ActorSelection.class), eqDeleteData());
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ WriteDataReply.class, MergeDataReply.class, DeleteDataReply.class);
+
+ // testing ready
+ doReturn(readyTxReply(shardActorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+ any(ActorSelection.class), isA(ReadyTransaction.class));
+
+ DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+ assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+ ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+ verifyCohortFutures(proxy, getSystem().actorSelection(shardActorRef.path()));
+ }