+ @Test
+ public void testPipelinedTransactionsWithImmediateReplication() {
+ immediatePayloadReplication(shardDataTree, mockShard);
+
+ final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
+ snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
+
+ final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
+ snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
+
+ YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
+ MapEntryNode carNode = CarsModel.newCarEntry("optima", new BigInteger("100"));
+ final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode));
+
+ final FutureCallback<UnsignedLong> commitCallback1 = immediate3PhaseCommit(cohort1);
+ final FutureCallback<UnsignedLong> commitCallback2 = immediate3PhaseCommit(cohort2);
+ final FutureCallback<UnsignedLong> commitCallback3 = immediate3PhaseCommit(cohort3);
+
+ InOrder inOrder = inOrder(commitCallback1, commitCallback2, commitCallback3);
+ inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class));
+ inOrder.verify(commitCallback2).onSuccess(any(UnsignedLong.class));
+ inOrder.verify(commitCallback3).onSuccess(any(UnsignedLong.class));
+
+ final DataTreeSnapshot snapshot = shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
+ Optional<NormalizedNode<?, ?>> optional = snapshot.readNode(CarsModel.BASE_PATH);
+ assertTrue("Car node present", optional.isPresent());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testAbortWithPendingCommits() throws Exception {
+ final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
+ snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
+
+ final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
+ snapshot.write(PeopleModel.BASE_PATH, PeopleModel.create()));
+
+ final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot ->
+ snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
+
+ YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
+ MapEntryNode carNode = CarsModel.newCarEntry("optima", new BigInteger("100"));
+ final ShardDataTreeCohort cohort4 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode));
+
+ coordinatedCanCommit(cohort2);
+ immediateCanCommit(cohort1);
+ coordinatedCanCommit(cohort3);
+ coordinatedCanCommit(cohort4);
+
+ coordinatedPreCommit(cohort1);
+ coordinatedPreCommit(cohort2);
+ coordinatedPreCommit(cohort3);
+
+ FutureCallback<Void> mockAbortCallback = mock(FutureCallback.class);
+ doNothing().when(mockAbortCallback).onSuccess(null);
+ cohort2.abort(mockAbortCallback);
+ verify(mockAbortCallback).onSuccess(null);
+
+ coordinatedPreCommit(cohort4);
+ coordinatedCommit(cohort1);
+ coordinatedCommit(cohort3);
+ coordinatedCommit(cohort4);
+
+ InOrder inOrder = inOrder(mockShard);
+ inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class),
+ eq(false));
+ inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class),
+ eq(false));
+ inOrder.verify(mockShard).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class),
+ eq(false));
+
+ // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
+ CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(),
+ cohort1.getCandidate());
+ shardDataTree.applyReplicatedPayload(cohort1.getIdentifier(), mockPayload);
+ shardDataTree.applyReplicatedPayload(cohort3.getIdentifier(), mockPayload);
+ shardDataTree.applyReplicatedPayload(cohort4.getIdentifier(), mockPayload);
+
+ final DataTreeSnapshot snapshot =
+ shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
+ Optional<NormalizedNode<?, ?>> optional = snapshot.readNode(carPath);
+ assertTrue("Car node present", optional.isPresent());
+ assertEquals("Car node", carNode, optional.get());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testAbortWithFailedRebase() {
+ immediatePayloadReplication(shardDataTree, mockShard);
+
+ final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
+ snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
+
+ final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
+ snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
+
+ NormalizedNode<?, ?> peopleNode = PeopleModel.create();
+ final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot ->
+ snapshot.write(PeopleModel.BASE_PATH, peopleNode));
+
+ immediateCanCommit(cohort1);
+ FutureCallback<Void> canCommitCallback2 = coordinatedCanCommit(cohort2);
+
+ coordinatedPreCommit(cohort1);
+ verify(canCommitCallback2).onSuccess(null);
+
+ FutureCallback<Void> mockAbortCallback = mock(FutureCallback.class);
+ doNothing().when(mockAbortCallback).onSuccess(null);
+ cohort1.abort(mockAbortCallback);
+ verify(mockAbortCallback).onSuccess(null);
+
+ FutureCallback<DataTreeCandidate> preCommitCallback2 = coordinatedPreCommit(cohort2);
+ verify(preCommitCallback2).onFailure(any(Throwable.class));
+
+ immediateCanCommit(cohort3);
+ immediatePreCommit(cohort3);
+ immediateCommit(cohort3);
+
+ final DataTreeSnapshot snapshot =
+ shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
+ Optional<NormalizedNode<?, ?>> optional = snapshot.readNode(PeopleModel.BASE_PATH);
+ assertTrue("People node present", optional.isPresent());
+ assertEquals("People node", peopleNode, optional.get());
+ }
+
+ private ShardDataTreeCohort newShardDataTreeCohort(final DataTreeOperation operation) {
+ final ReadWriteShardDataTreeTransaction transaction =
+ shardDataTree.newReadWriteTransaction(nextTransactionId());
+ final DataTreeModification snapshot = transaction.getSnapshot();
+ operation.execute(snapshot);
+ return shardDataTree.finishTransaction(transaction, Optional.empty());
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private static void verifyOnDataTreeChanged(final DOMDataTreeChangeListener listener,
+ final Consumer<DataTreeCandidate> callback) {
+ ArgumentCaptor<Collection> changes = ArgumentCaptor.forClass(Collection.class);
+ verify(listener, atLeastOnce()).onDataTreeChanged(changes.capture());
+ for (Collection list : changes.getAllValues()) {
+ for (Object dtc : list) {
+ callback.accept((DataTreeCandidate)dtc);
+ }
+ }
+
+ reset(listener);
+ }
+
+ private static NormalizedNode<?, ?> getCars(final ShardDataTree shardDataTree) {
+ final ReadOnlyShardDataTreeTransaction readOnlyShardDataTreeTransaction =
+ shardDataTree.newReadOnlyTransaction(nextTransactionId());
+ final DataTreeSnapshot snapshot1 = readOnlyShardDataTreeTransaction.getSnapshot();
+
+ final Optional<NormalizedNode<?, ?>> optional = snapshot1.readNode(CarsModel.BASE_PATH);
+
+ assertTrue(optional.isPresent());
+
+ return optional.get();
+ }
+
+ private static DataTreeCandidate addCar(final ShardDataTree shardDataTree) {
+ return addCar(shardDataTree, "altima");
+ }
+
+ private static DataTreeCandidate addCar(final ShardDataTree shardDataTree, final String name) {
+ return doTransaction(shardDataTree, snapshot -> {
+ snapshot.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+ snapshot.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+ snapshot.write(CarsModel.newCarPath(name), CarsModel.newCarEntry(name, new BigInteger("100")));
+ });
+ }
+
+ private static DataTreeCandidate removeCar(final ShardDataTree shardDataTree) {
+ return doTransaction(shardDataTree, snapshot -> snapshot.delete(CarsModel.newCarPath("altima")));
+ }
+
+ @FunctionalInterface
+ private interface DataTreeOperation {
+ void execute(DataTreeModification snapshot);
+ }
+
+ private static DataTreeCandidate doTransaction(final ShardDataTree shardDataTree,
+ final DataTreeOperation operation) {
+ final ReadWriteShardDataTreeTransaction transaction =
+ shardDataTree.newReadWriteTransaction(nextTransactionId());
+ final DataTreeModification snapshot = transaction.getSnapshot();
+ operation.execute(snapshot);
+ final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction, Optional.empty());
+
+ immediateCanCommit(cohort);
+ immediatePreCommit(cohort);
+ final DataTreeCandidate candidate = cohort.getCandidate();
+ immediateCommit(cohort);
+
+ return candidate;
+ }
+
+ private static DataTreeCandidate applyCandidates(final ShardDataTree shardDataTree,
+ final List<DataTreeCandidate> candidates) {
+ final ReadWriteShardDataTreeTransaction transaction =
+ shardDataTree.newReadWriteTransaction(nextTransactionId());
+ final DataTreeModification snapshot = transaction.getSnapshot();
+ for (final DataTreeCandidate candidateTip : candidates) {
+ DataTreeCandidates.applyToModification(snapshot, candidateTip);
+ }
+ final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction, Optional.empty());
+
+ immediateCanCommit(cohort);
+ immediatePreCommit(cohort);
+ final DataTreeCandidate candidate = cohort.getCandidate();
+ immediateCommit(cohort);
+
+ return candidate;
+ }
+}