+ OnCanCommitFutureComplete(final TransactionIdentifier transactionID) {
+ super(CanCommitTransactionReply.class);
+ this.transactionID = transactionID;
+ }
+
+ @Override
+ void onSuccess(final Object resp) throws Exception {
+ final CanCommitTransactionReply canCommitReply =
+ CanCommitTransactionReply.fromSerializable(resp);
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ final Future<Object> commitFuture = Patterns.ask(shard,
+ new CommitTransaction(transactionID, CURRENT_VERSION).toSerializable(), timeout);
+ commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
+ }
+ }
+
+ new ShardTestKit(getSystem()) {
+ {
+ waitUntilLeader(shard);
+
+ final TransactionIdentifier transactionID1 = nextTransactionId();
+ final TransactionIdentifier transactionID2 = nextTransactionId();
+ final TransactionIdentifier transactionID3 = nextTransactionId();
+
+ final Map<TransactionIdentifier, CapturingShardDataTreeCohort> cohortMap = setupCohortDecorator(
+ shard.underlyingActor(), transactionID1, transactionID2, transactionID3);
+ final CapturingShardDataTreeCohort cohort1 = cohortMap.get(transactionID1);
+ final CapturingShardDataTreeCohort cohort2 = cohortMap.get(transactionID2);
+ final CapturingShardDataTreeCohort cohort3 = cohortMap.get(transactionID3);
+
+ shard.tell(prepareBatchedModifications(transactionID1, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), false), getRef());
+ final ReadyTransactionReply readyReply = ReadyTransactionReply
+ .fromSerializable(expectMsgClass(duration, ReadyTransactionReply.class));
+ assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
+ // Send the CanCommitTransaction message for the first Tx.
+
+ shard.tell(new CanCommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ final CanCommitTransactionReply canCommitReply = CanCommitTransactionReply
+ .fromSerializable(expectMsgClass(duration, CanCommitTransactionReply.class));
+ assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+ // Ready 2 more Tx's.
+
+ shard.tell(prepareBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), false), getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ shard.tell(
+ prepareBatchedModifications(transactionID3,
+ YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+ .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), false),
+ getRef());
+ expectMsgClass(duration, ReadyTransactionReply.class);
+
+ // Send the CanCommitTransaction message for the next 2 Tx's.
+ // These should get queued and
+ // processed after the first Tx completes.
+
+ final Future<Object> canCommitFuture1 = Patterns.ask(shard,
+ new CanCommitTransaction(transactionID2, CURRENT_VERSION).toSerializable(), timeout);
+
+ final Future<Object> canCommitFuture2 = Patterns.ask(shard,
+ new CanCommitTransaction(transactionID3, CURRENT_VERSION).toSerializable(), timeout);
+
+ // Send the CommitTransaction message for the first Tx. After it
+ // completes, it should
+ // trigger the 2nd Tx to proceed which should in turn then
+ // trigger the 3rd.
+
+ shard.tell(new CommitTransaction(transactionID1, CURRENT_VERSION).toSerializable(), getRef());
+ expectMsgClass(duration, CommitTransactionReply.class);
+
+ // Wait for the next 2 Tx's to complete.
+
+ canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2), getSystem().dispatcher());
+
+ canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3), getSystem().dispatcher());
+
+ final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
+
+ if (caughtEx.get() != null) {
+ Throwables.propagateIfInstanceOf(caughtEx.get(), Exception.class);
+ Throwables.propagate(caughtEx.get());
+ }
+
+ assertEquals("Commits complete", true, done);
+
+// final InOrder inOrder = inOrder(cohort1.getCanCommit(), cohort1.getPreCommit(), cohort1.getCommit(),
+// cohort2.getCanCommit(), cohort2.getPreCommit(), cohort2.getCommit(), cohort3.getCanCommit(),
+// cohort3.getPreCommit(), cohort3.getCommit());
+// inOrder.verify(cohort1.getCanCommit()).onSuccess(any(Void.class));
+// inOrder.verify(cohort1.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
+// inOrder.verify(cohort2.getCanCommit()).onSuccess(any(Void.class));
+// inOrder.verify(cohort2.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
+// inOrder.verify(cohort3.getCanCommit()).onSuccess(any(Void.class));
+// inOrder.verify(cohort3.getPreCommit()).onSuccess(any(DataTreeCandidate.class));
+// inOrder.verify(cohort1.getCommit()).onSuccess(any(UnsignedLong.class));
+// inOrder.verify(cohort2.getCommit()).onSuccess(any(UnsignedLong.class));
+// inOrder.verify(cohort3.getCommit()).onSuccess(any(UnsignedLong.class));
+
+ // Verify data in the data store.
+
+ verifyOuterListEntry(shard, 1);
+
+ verifyLastApplied(shard, 2);
+ }
+ };