+ void onSuccess(final Object resp) {
+ }
+ }
+
+ class OnCommitFutureComplete extends OnFutureComplete {
+ OnCommitFutureComplete() {
+ super(CommitTransactionReply.class);
+ }
+
+ @Override
+ public void onComplete(final Throwable error, final Object resp) {
+ super.onComplete(error, resp);
+ commitLatch.countDown();
+ }
+ }
+
+ class OnCanCommitFutureComplete extends OnFutureComplete {
+ private final TransactionIdentifier transactionID;
+
+ OnCanCommitFutureComplete(final TransactionIdentifier transactionID) {
+ super(CanCommitTransactionReply.class);
+ this.transactionID = transactionID;
+ }
+
+ @Override
+ void onSuccess(final Object resp) {
+ final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(resp);
+ assertTrue("Can commit", canCommitReply.getCanCommit());
+
+ final Future<Object> commitFuture = Patterns.ask(shard,
+ new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), timeout);
+ commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
+ }
+ }
+
+ final ShardTestKit testKit = new ShardTestKit(getSystem());
+ ShardTestKit.waitUntilLeader(shard);
+
+ final TransactionIdentifier transactionID1 = nextTransactionId();
+ final TransactionIdentifier transactionID2 = nextTransactionId();
+ final TransactionIdentifier transactionID3 = nextTransactionId();
+
+ final Map<TransactionIdentifier, CapturingShardDataTreeCohort> cohortMap = setupCohortDecorator(
+ shard.underlyingActor(), transactionID1, transactionID2, transactionID3);
+ final CapturingShardDataTreeCohort cohort1 = cohortMap.get(transactionID1);
+ final CapturingShardDataTreeCohort cohort2 = cohortMap.get(transactionID2);
+ final CapturingShardDataTreeCohort cohort3 = cohortMap.get(transactionID3);
+
+ shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), testKit.getRef());
+ final ReadyTransactionReply readyReply = ReadyTransactionReply
+ .fromSerializable(testKit.expectMsgClass(duration, ReadyTransactionReply.class));
+
+ String pathSuffix = shard.path().toString().replaceFirst("akka://test", "");
+ assertTrue("Cohort path", readyReply.getCohortPath().endsWith(pathSuffix));
+ // Send the CanCommitTransaction message for the first Tx.
+
+ shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), testKit.getRef());
+ final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
+ .fromSerializable(testKit.expectMsgClass(duration, CanCommitTransactionReply.class));
+ assertTrue("Can commit", canCommitReply.getCanCommit());
+
+ // Ready 2 more Tx's.
+
+ shard.tell(prepareBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), testKit.getRef());
+ testKit.expectMsgClass(duration, ReadyTransactionReply.class);
+
+ shard.tell(
+ prepareBatchedModifications(transactionID3,
+ YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+ .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false), testKit.getRef());
+ testKit.expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // Send the CanCommitTransaction message for the next 2 Tx's.
+ // These should get queued and
+ // processed after the first Tx completes.