+
+ @Test
+ public void testBatchedModificationsWithCommitOnReady() throws Throwable {
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testBatchedModificationsWithCommitOnReady");
+
+ waitUntilLeader(shard);
+
+ final String transactionID = "tx";
+
+ BatchedModifications batched = new BatchedModifications(transactionID,
+ DataStoreVersions.LITHIUM_VERSION, "");
+ batched.addModification(new WriteModification(TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME)));
+ batched.setReady(true);
+ batched.setDoCommitOnReady(true);
+ batched.setTotalMessagesSent(1);
+
+ shard.tell(batched, getRef());
+ expectMsgClass(ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
+ }};
+ }
+
+ @Test
+ public void testImmediateCommitWithForwardedReadyTransaction() throws Throwable {
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testImmediateCommitWithForwardedReadyTransaction");
+
+ waitUntilLeader(shard);
+
+ final String transactionID = "tx";
+
+ shard.tell(prepareForwardedReadyTransaction(mockShardDataTreeCohort(), transactionID,
+ DataStoreVersions.LITHIUM_VERSION, true), getRef());
+
+ expectMsgClass(ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
+ }};
+ }
+
+ @Test
+ public void testThreePhaseCommitOnForwardedReadyTransaction() throws Throwable {
+ new ShardTestKit(getSystem()) {{
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ "testThreePhaseCommitOnForwardedReadyTransaction");
+
+ waitUntilLeader(shard);
+
+ final String transactionID = "tx";
+
+ shard.tell(prepareForwardedReadyTransaction(mockShardDataTreeCohort(), transactionID,
+ DataStoreVersions.LITHIUM_VERSION, false), getRef());
+ expectMsgClass(ReadyTransactionReply.class);
+
+ shard.tell(new CanCommitTransaction(transactionID, DataStoreVersions.LITHIUM_VERSION).toSerializable(), getRef());
+ CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+ expectMsgClass(ThreePhaseCommitCohortMessages.CanCommitTransactionReply.class));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ shard.tell(new CommitTransaction(transactionID, DataStoreVersions.LITHIUM_VERSION).toSerializable(), getRef());
+ expectMsgClass(ThreePhaseCommitCohortMessages.CommitTransactionReply.class);
+ }};
+ }