+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+ assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+ ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ WriteDataReply.SERIALIZABLE_CLASS);
+
+ verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadyWithRecordingOperationFailure() throws Exception {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
+
+ doReturn(Futures.failed(new TestException())).when(mockActorContext).
+ executeOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
+
+ doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ WRITE_ONLY);
+
+ transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+ assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+ ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ MergeDataReply.SERIALIZABLE_CLASS, TestException.class);
+
+ verifyCohortFutures(proxy, TestException.class);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadyWithReplyFailure() throws Exception {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(mergeDataReply()).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqMergeData(nodeToWrite));
+
+ doReturn(Futures.failed(new TestException())).when(mockActorContext).
+ executeOperationAsync(eq(actorSelection(actorRef)),
+ isA(ReadyTransaction.SERIALIZABLE_CLASS));
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ WRITE_ONLY);
+
+ transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
+
+ DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+ assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+ ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ MergeDataReply.SERIALIZABLE_CLASS);
+
+ verifyCohortFutures(proxy, TestException.class);
+ }
+
+ @Test
+ public void testReadyWithInitialCreateTransactionFailure() throws Exception {
+
+ doReturn(Optional.absent()).when(mockActorContext).findPrimaryShard(anyString());
+// doThrow(new PrimaryNotFoundException("mock")).when(mockActorContext).executeShardOperation(
+// anyString(), any());
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ WRITE_ONLY);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ transactionProxy.delete(TestModel.TEST_PATH);
+
+ DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+ assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+ ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+ verifyCohortFutures(proxy, PrimaryNotFoundException.class);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadyWithInvalidReplyMessageType() throws Exception {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(WRITE_ONLY);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
+
+ doReturn(Futures.successful(new Object())).when(mockActorContext).
+ executeOperationAsync(eq(actorSelection(actorRef)),
+ isA(ReadyTransaction.SERIALIZABLE_CLASS));
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ WRITE_ONLY);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+