+ shard.tell(
+ prepareBatchedModifications(transactionID2, listNodePath,
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), false), testKit.getRef());
+ testKit.expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // canCommit 1st Tx. We don't send the commit so it should
+ // timeout.
+
+ shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
+ testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
+
+ // canCommit the 2nd Tx - it should complete after the 1st Tx
+ // times out.
+
+ shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
+ testKit.expectMsgClass(duration, CanCommitTransactionReply.class);
+
+ // Try to commit the 1st Tx - should fail as it's not the
+ // current Tx.
+
+ shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
+ testKit.expectMsgClass(duration, akka.actor.Status.Failure.class);
+
+ // Commit the 2nd Tx.
+
+ shard.tell(new CommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), testKit.getRef());
+ testKit.expectMsgClass(duration, CommitTransactionReply.class);
+
+ final NormalizedNode<?, ?> node = readStore(shard, listNodePath);
+ assertNotNull(listNodePath + " not found", node);
+ }
+
+// @Test
+// @Ignore
+// public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
+// dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
+//
+// new ShardTestKit(getSystem()) {{
+// final TestActorRef<Shard> shard = actorFactory.createTestActor(
+// newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+// "testTransactionCommitQueueCapacityExceeded");
+//
+// waitUntilLeader(shard);
+//
+// final FiniteDuration duration = duration("5 seconds");
+//
+// final ShardDataTree dataStore = shard.underlyingActor().getDataStore();
+//
+// final TransactionIdentifier transactionID1 = nextTransactionId();
+// final MutableCompositeModification modification1 = new MutableCompositeModification();
+// final ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+// TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID1,
+// modification1);
+//
+// final TransactionIdentifier transactionID2 = nextTransactionId();
+// final MutableCompositeModification modification2 = new MutableCompositeModification();
+// final ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+// TestModel.OUTER_LIST_PATH,
+// ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), transactionID2,
+// modification2);
+//
+// final TransactionIdentifier transactionID3 = nextTransactionId();
+// final MutableCompositeModification modification3 = new MutableCompositeModification();
+// final ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+// TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), transactionID3,
+// modification3);
+//
+// // Ready the Tx's
+//
+// shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort1, transactionID1,
+// modification1), getRef());
+// expectMsgClass(duration, ReadyTransactionReply.class);
+//
+// shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort2, transactionID2,
+// modification2), getRef());
+// expectMsgClass(duration, ReadyTransactionReply.class);
+//
+// // The 3rd Tx should exceed queue capacity and fail.
+//
+// shard.tell(prepareReadyTransactionMessage(false, shard.underlyingActor(), cohort3, transactionID3,
+// modification3), getRef());
+// expectMsgClass(duration, akka.actor.Status.Failure.class);
+//
+// // canCommit 1st Tx.
+//
+// shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+// expectMsgClass(duration, CanCommitTransactionReply.class);
+//
+// // canCommit the 2nd Tx - it should get queued.
+//
+// shard.tell(new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), getRef());
+//
+// // canCommit the 3rd Tx - should exceed queue capacity and fail.
+//
+// shard.tell(new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), getRef());
+// expectMsgClass(duration, akka.actor.Status.Failure.class);
+// }};
+// }