Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardDataTreeTest.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertTrue;
13 import static org.junit.Assert.fail;
14 import static org.mockito.ArgumentMatchers.any;
15 import static org.mockito.ArgumentMatchers.anyBoolean;
16 import static org.mockito.ArgumentMatchers.eq;
17 import static org.mockito.Mockito.atLeastOnce;
18 import static org.mockito.Mockito.doNothing;
19 import static org.mockito.Mockito.doReturn;
20 import static org.mockito.Mockito.inOrder;
21 import static org.mockito.Mockito.mock;
22 import static org.mockito.Mockito.never;
23 import static org.mockito.Mockito.reset;
24 import static org.mockito.Mockito.verify;
25 import static org.mockito.Mockito.verifyNoMoreInteractions;
26 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.coordinatedCanCommit;
27 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.coordinatedCommit;
28 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.coordinatedPreCommit;
29 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediate3PhaseCommit;
30 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCanCommit;
31 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCommit;
32 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediatePayloadReplication;
33 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediatePreCommit;
34
35 import com.google.common.base.Ticker;
36 import com.google.common.primitives.UnsignedLong;
37 import com.google.common.util.concurrent.FutureCallback;
38 import java.io.IOException;
39 import java.util.ArrayList;
40 import java.util.Collection;
41 import java.util.HashMap;
42 import java.util.List;
43 import java.util.Map;
44 import java.util.Optional;
45 import java.util.function.Consumer;
46 import org.junit.Before;
47 import org.junit.Test;
48 import org.mockito.ArgumentCaptor;
49 import org.mockito.InOrder;
50 import org.mockito.Mockito;
51 import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
52 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
53 import org.opendaylight.controller.cluster.datastore.persisted.PayloadVersion;
54 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
55 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
56 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
57 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
58 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
59 import org.opendaylight.yangtools.yang.common.Empty;
60 import org.opendaylight.yangtools.yang.common.Uint64;
61 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
62 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
63 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
64 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
65 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
66 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
67 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
68 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
69 import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
70 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
71 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeConfiguration;
72 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
73 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeSnapshot;
74 import org.opendaylight.yangtools.yang.data.tree.api.DataValidationFailedException;
75 import org.opendaylight.yangtools.yang.data.tree.api.ModificationType;
76 import org.opendaylight.yangtools.yang.data.tree.api.TreeType;
77 import org.opendaylight.yangtools.yang.data.tree.impl.di.InMemoryDataTreeFactory;
78 import org.opendaylight.yangtools.yang.data.tree.spi.DataTreeCandidates;
79 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
80 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
81
82 public class ShardDataTreeTest extends AbstractTest {
83     private static final DatastoreContext DATASTORE_CONTEXT = DatastoreContext.newBuilder().build();
84
85     private final Shard mockShard = Mockito.mock(Shard.class);
86     private ShardDataTree shardDataTree;
87     private EffectiveModelContext fullSchema;
88
89     @Before
90     public void setUp() {
91         doReturn(Ticker.systemTicker()).when(mockShard).ticker();
92         doReturn(new ShardStats("shardName", "mxBeanType", mockShard)).when(mockShard).getShardMBean();
93         doReturn(DATASTORE_CONTEXT).when(mockShard).getDatastoreContext();
94
95         fullSchema = SchemaContextHelper.full();
96
97         shardDataTree = new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL);
98     }
99
100     @Test
101     public void testWrite() {
102         modify(false, true, true);
103     }
104
105     @Test
106     public void testMerge() {
107         modify(true, true, true);
108     }
109
110     private void modify(final boolean merge, final boolean expectedCarsPresent, final boolean expectedPeoplePresent) {
111         immediatePayloadReplication(shardDataTree, mockShard);
112
113         assertEquals(fullSchema, shardDataTree.getSchemaContext());
114
115         final ReadWriteShardDataTreeTransaction transaction =
116                 shardDataTree.newReadWriteTransaction(nextTransactionId());
117
118         final DataTreeModification snapshot = transaction.getSnapshot();
119
120         assertNotNull(snapshot);
121
122         if (merge) {
123             snapshot.merge(CarsModel.BASE_PATH, CarsModel.create());
124             snapshot.merge(PeopleModel.BASE_PATH, PeopleModel.create());
125         } else {
126             snapshot.write(CarsModel.BASE_PATH, CarsModel.create());
127             snapshot.write(PeopleModel.BASE_PATH, PeopleModel.create());
128         }
129
130         final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction, Optional.empty());
131
132         immediateCanCommit(cohort);
133         immediatePreCommit(cohort);
134         immediateCommit(cohort);
135
136         final ReadOnlyShardDataTreeTransaction readOnlyShardDataTreeTransaction =
137                 shardDataTree.newReadOnlyTransaction(nextTransactionId());
138
139         final DataTreeSnapshot snapshot1 = readOnlyShardDataTreeTransaction.getSnapshot();
140
141         final Optional<NormalizedNode> optional = snapshot1.readNode(CarsModel.BASE_PATH);
142
143         assertEquals(expectedCarsPresent, optional.isPresent());
144
145         final Optional<NormalizedNode> optional1 = snapshot1.readNode(PeopleModel.BASE_PATH);
146
147         assertEquals(expectedPeoplePresent, optional1.isPresent());
148     }
149
150     @Test
151     public void bug4359AddRemoveCarOnce() {
152         immediatePayloadReplication(shardDataTree, mockShard);
153
154         final List<DataTreeCandidate> candidates = new ArrayList<>();
155         candidates.add(addCar(shardDataTree));
156         candidates.add(removeCar(shardDataTree));
157
158         final NormalizedNode expected = getCars(shardDataTree);
159
160         applyCandidates(shardDataTree, candidates);
161
162         final NormalizedNode actual = getCars(shardDataTree);
163
164         assertEquals(expected, actual);
165     }
166
167     @Test
168     public void bug4359AddRemoveCarTwice() {
169         immediatePayloadReplication(shardDataTree, mockShard);
170
171         final List<DataTreeCandidate> candidates = new ArrayList<>();
172         candidates.add(addCar(shardDataTree));
173         candidates.add(removeCar(shardDataTree));
174         candidates.add(addCar(shardDataTree));
175         candidates.add(removeCar(shardDataTree));
176
177         final NormalizedNode expected = getCars(shardDataTree);
178
179         applyCandidates(shardDataTree, candidates);
180
181         final NormalizedNode actual = getCars(shardDataTree);
182
183         assertEquals(expected, actual);
184     }
185
186     @Test
187     public void testListenerNotifiedOnApplySnapshot() throws Exception {
188         immediatePayloadReplication(shardDataTree, mockShard);
189
190         DOMDataTreeChangeListener listener = mock(DOMDataTreeChangeListener.class);
191         shardDataTree.registerTreeChangeListener(CarsModel.CAR_LIST_PATH.node(CarsModel.CAR_QNAME), listener,
192             Optional.empty(), noop -> { });
193
194         addCar(shardDataTree, "optima");
195
196         verifyOnDataTreeChanged(listener, dtc -> {
197             assertEquals("getModificationType", ModificationType.WRITE, dtc.getRootNode().modificationType());
198             assertEquals("getRootPath", CarsModel.newCarPath("optima"), dtc.getRootPath());
199         });
200
201         addCar(shardDataTree, "sportage");
202
203         verifyOnDataTreeChanged(listener, dtc -> {
204             assertEquals("getModificationType", ModificationType.WRITE, dtc.getRootNode().modificationType());
205             assertEquals("getRootPath", CarsModel.newCarPath("sportage"), dtc.getRootPath());
206         });
207
208         ShardDataTree newDataTree = new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL);
209         immediatePayloadReplication(newDataTree, mockShard);
210         addCar(newDataTree, "optima");
211         addCar(newDataTree, "murano");
212
213         shardDataTree.applySnapshot(newDataTree.takeStateSnapshot());
214
215         Map<YangInstanceIdentifier, ModificationType> expChanges = new HashMap<>();
216         expChanges.put(CarsModel.newCarPath("optima"), ModificationType.WRITE);
217         expChanges.put(CarsModel.newCarPath("murano"), ModificationType.WRITE);
218         expChanges.put(CarsModel.newCarPath("sportage"), ModificationType.DELETE);
219         verifyOnDataTreeChanged(listener, dtc -> {
220             ModificationType expType = expChanges.remove(dtc.getRootPath());
221             assertNotNull("Got unexpected change for " + dtc.getRootPath(), expType);
222             assertEquals("getModificationType", expType, dtc.getRootNode().modificationType());
223         });
224
225         if (!expChanges.isEmpty()) {
226             fail("Missing change notifications: " + expChanges);
227         }
228     }
229
230     @Test
231     public void testPipelinedTransactionsWithCoordinatedCommits() throws Exception {
232         final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
233             snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
234
235         final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
236             snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
237
238         NormalizedNode peopleNode = PeopleModel.create();
239         final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot ->
240             snapshot.write(PeopleModel.BASE_PATH, peopleNode));
241
242         YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
243         MapEntryNode carNode = CarsModel.newCarEntry("optima", Uint64.valueOf(100));
244         final ShardDataTreeCohort cohort4 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode));
245
246         immediateCanCommit(cohort1);
247         final FutureCallback<Empty> canCommitCallback2 = coordinatedCanCommit(cohort2);
248         final FutureCallback<Empty> canCommitCallback3 = coordinatedCanCommit(cohort3);
249         final FutureCallback<Empty> canCommitCallback4 = coordinatedCanCommit(cohort4);
250
251         final FutureCallback<DataTreeCandidate> preCommitCallback1 = coordinatedPreCommit(cohort1);
252         verify(preCommitCallback1).onSuccess(cohort1.getCandidate());
253         verify(canCommitCallback2).onSuccess(Empty.value());
254
255         final FutureCallback<DataTreeCandidate> preCommitCallback2 = coordinatedPreCommit(cohort2);
256         verify(preCommitCallback2).onSuccess(cohort2.getCandidate());
257         verify(canCommitCallback3).onSuccess(Empty.value());
258
259         final FutureCallback<DataTreeCandidate> preCommitCallback3 = coordinatedPreCommit(cohort3);
260         verify(preCommitCallback3).onSuccess(cohort3.getCandidate());
261         verify(canCommitCallback4).onSuccess(Empty.value());
262
263         final FutureCallback<DataTreeCandidate> preCommitCallback4 = coordinatedPreCommit(cohort4);
264         verify(preCommitCallback4).onSuccess(cohort4.getCandidate());
265
266         final FutureCallback<UnsignedLong> commitCallback2 = coordinatedCommit(cohort2);
267         verify(mockShard, never()).persistPayload(eq(cohort1.transactionId()), any(CommitTransactionPayload.class),
268                 anyBoolean());
269         verifyNoMoreInteractions(commitCallback2);
270
271         final FutureCallback<UnsignedLong> commitCallback4 = coordinatedCommit(cohort4);
272         verify(mockShard, never()).persistPayload(eq(cohort4.transactionId()), any(CommitTransactionPayload.class),
273                 anyBoolean());
274         verifyNoMoreInteractions(commitCallback4);
275
276         final FutureCallback<UnsignedLong> commitCallback1 = coordinatedCommit(cohort1);
277         InOrder inOrder = inOrder(mockShard);
278         inOrder.verify(mockShard).persistPayload(eq(cohort1.transactionId()), any(CommitTransactionPayload.class),
279                 eq(true));
280         inOrder.verify(mockShard).persistPayload(eq(cohort2.transactionId()), any(CommitTransactionPayload.class),
281                 eq(false));
282         verifyNoMoreInteractions(commitCallback1);
283         verifyNoMoreInteractions(commitCallback2);
284
285         final FutureCallback<UnsignedLong> commitCallback3 = coordinatedCommit(cohort3);
286         inOrder = inOrder(mockShard);
287         inOrder.verify(mockShard).persistPayload(eq(cohort3.transactionId()), any(CommitTransactionPayload.class),
288                 eq(true));
289         inOrder.verify(mockShard).persistPayload(eq(cohort4.transactionId()), any(CommitTransactionPayload.class),
290                 eq(false));
291         verifyNoMoreInteractions(commitCallback3);
292         verifyNoMoreInteractions(commitCallback4);
293
294         final ShardDataTreeCohort cohort5 = newShardDataTreeCohort(snapshot ->
295             snapshot.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
296         final FutureCallback<Empty> canCommitCallback5 = coordinatedCanCommit(cohort5);
297
298         // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
299         CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(),
300                 cohort1.getCandidate());
301         shardDataTree.applyReplicatedPayload(cohort1.transactionId(), mockPayload);
302         shardDataTree.applyReplicatedPayload(cohort2.transactionId(), mockPayload);
303         shardDataTree.applyReplicatedPayload(cohort3.transactionId(), mockPayload);
304         shardDataTree.applyReplicatedPayload(cohort4.transactionId(), mockPayload);
305
306         inOrder = inOrder(commitCallback1, commitCallback2, commitCallback3, commitCallback4);
307         inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class));
308         inOrder.verify(commitCallback2).onSuccess(any(UnsignedLong.class));
309         inOrder.verify(commitCallback3).onSuccess(any(UnsignedLong.class));
310         inOrder.verify(commitCallback4).onSuccess(any(UnsignedLong.class));
311
312         verify(canCommitCallback5).onSuccess(Empty.value());
313
314         final DataTreeSnapshot snapshot =
315                 shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
316         assertEquals("Car node", Optional.of(carNode), snapshot.readNode(carPath));
317         assertEquals("People node", Optional.of(peopleNode), snapshot.readNode(PeopleModel.BASE_PATH));
318     }
319
320     @Test
321     public void testPipelinedTransactionsWithImmediateCommits() throws Exception {
322         final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
323             snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
324
325         final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
326             snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
327
328         YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
329         MapEntryNode carNode = CarsModel.newCarEntry("optima", Uint64.valueOf(100));
330         final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode));
331
332         final FutureCallback<UnsignedLong> commitCallback2 = immediate3PhaseCommit(cohort2);
333         final FutureCallback<UnsignedLong> commitCallback3 = immediate3PhaseCommit(cohort3);
334         final FutureCallback<UnsignedLong> commitCallback1 = immediate3PhaseCommit(cohort1);
335
336         InOrder inOrder = inOrder(mockShard);
337         inOrder.verify(mockShard).persistPayload(eq(cohort1.transactionId()), any(CommitTransactionPayload.class),
338                 eq(true));
339         inOrder.verify(mockShard).persistPayload(eq(cohort2.transactionId()), any(CommitTransactionPayload.class),
340                 eq(true));
341         inOrder.verify(mockShard).persistPayload(eq(cohort3.transactionId()), any(CommitTransactionPayload.class),
342                 eq(false));
343
344         // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
345         CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(),
346                 cohort1.getCandidate());
347         shardDataTree.applyReplicatedPayload(cohort1.transactionId(), mockPayload);
348         shardDataTree.applyReplicatedPayload(cohort2.transactionId(), mockPayload);
349         shardDataTree.applyReplicatedPayload(cohort3.transactionId(), mockPayload);
350
351         inOrder = inOrder(commitCallback1, commitCallback2, commitCallback3);
352         inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class));
353         inOrder.verify(commitCallback2).onSuccess(any(UnsignedLong.class));
354         inOrder.verify(commitCallback3).onSuccess(any(UnsignedLong.class));
355
356         final DataTreeSnapshot snapshot =
357                 shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
358         assertEquals("Car node", Optional.of(carNode), snapshot.readNode(carPath));
359     }
360
361     @Test
362     public void testPipelinedTransactionsWithImmediateReplication() {
363         immediatePayloadReplication(shardDataTree, mockShard);
364
365         final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
366             snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
367
368         final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
369             snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
370
371         YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
372         MapEntryNode carNode = CarsModel.newCarEntry("optima", Uint64.valueOf(100));
373         final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode));
374
375         final FutureCallback<UnsignedLong> commitCallback1 = immediate3PhaseCommit(cohort1);
376         final FutureCallback<UnsignedLong> commitCallback2 = immediate3PhaseCommit(cohort2);
377         final FutureCallback<UnsignedLong> commitCallback3 = immediate3PhaseCommit(cohort3);
378
379         InOrder inOrder = inOrder(commitCallback1, commitCallback2, commitCallback3);
380         inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class));
381         inOrder.verify(commitCallback2).onSuccess(any(UnsignedLong.class));
382         inOrder.verify(commitCallback3).onSuccess(any(UnsignedLong.class));
383
384         final DataTreeSnapshot snapshot = shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
385         Optional<NormalizedNode> optional = snapshot.readNode(CarsModel.BASE_PATH);
386         assertTrue("Car node present", optional.isPresent());
387     }
388
389     @SuppressWarnings("unchecked")
390     @Test
391     public void testAbortWithPendingCommits() throws Exception {
392         final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
393             snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
394
395         final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
396             snapshot.write(PeopleModel.BASE_PATH, PeopleModel.create()));
397
398         final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot ->
399             snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
400
401         YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
402         MapEntryNode carNode = CarsModel.newCarEntry("optima", Uint64.valueOf(100));
403         final ShardDataTreeCohort cohort4 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode));
404
405         coordinatedCanCommit(cohort2);
406         immediateCanCommit(cohort1);
407         coordinatedCanCommit(cohort3);
408         coordinatedCanCommit(cohort4);
409
410         coordinatedPreCommit(cohort1);
411         coordinatedPreCommit(cohort2);
412         coordinatedPreCommit(cohort3);
413
414         FutureCallback<Empty> mockAbortCallback = mock(FutureCallback.class);
415         doNothing().when(mockAbortCallback).onSuccess(Empty.value());
416         cohort2.abort(mockAbortCallback);
417         verify(mockAbortCallback).onSuccess(Empty.value());
418
419         coordinatedPreCommit(cohort4);
420         coordinatedCommit(cohort1);
421         coordinatedCommit(cohort3);
422         coordinatedCommit(cohort4);
423
424         InOrder inOrder = inOrder(mockShard);
425         inOrder.verify(mockShard).persistPayload(eq(cohort1.transactionId()), any(CommitTransactionPayload.class),
426                 eq(false));
427         inOrder.verify(mockShard).persistPayload(eq(cohort3.transactionId()), any(CommitTransactionPayload.class),
428                 eq(false));
429         inOrder.verify(mockShard).persistPayload(eq(cohort4.transactionId()), any(CommitTransactionPayload.class),
430                 eq(false));
431
432         // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
433         CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(),
434                 cohort1.getCandidate());
435         shardDataTree.applyReplicatedPayload(cohort1.transactionId(), mockPayload);
436         shardDataTree.applyReplicatedPayload(cohort3.transactionId(), mockPayload);
437         shardDataTree.applyReplicatedPayload(cohort4.transactionId(), mockPayload);
438
439         final DataTreeSnapshot snapshot =
440                 shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
441         Optional<NormalizedNode> optional = snapshot.readNode(carPath);
442         assertEquals("Car node", Optional.of(carNode), optional);
443     }
444
445     @SuppressWarnings("unchecked")
446     @Test
447     public void testAbortWithFailedRebase() {
448         immediatePayloadReplication(shardDataTree, mockShard);
449
450         final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
451             snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
452
453         final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
454             snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
455
456         NormalizedNode peopleNode = PeopleModel.create();
457         final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot ->
458             snapshot.write(PeopleModel.BASE_PATH, peopleNode));
459
460         immediateCanCommit(cohort1);
461         FutureCallback<Empty> canCommitCallback2 = coordinatedCanCommit(cohort2);
462
463         coordinatedPreCommit(cohort1);
464         verify(canCommitCallback2).onSuccess(Empty.value());
465
466         FutureCallback<Empty> mockAbortCallback = mock(FutureCallback.class);
467         doNothing().when(mockAbortCallback).onSuccess(Empty.value());
468         cohort1.abort(mockAbortCallback);
469         verify(mockAbortCallback).onSuccess(Empty.value());
470
471         FutureCallback<DataTreeCandidate> preCommitCallback2 = coordinatedPreCommit(cohort2);
472         verify(preCommitCallback2).onFailure(any(Throwable.class));
473
474         immediateCanCommit(cohort3);
475         immediatePreCommit(cohort3);
476         immediateCommit(cohort3);
477
478         final DataTreeSnapshot snapshot =
479                 shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
480         Optional<NormalizedNode> optional = snapshot.readNode(PeopleModel.BASE_PATH);
481         assertEquals("People node", Optional.of(peopleNode), optional);
482     }
483
484     @Test
485     public void testUintCommitPayload() throws IOException {
486         shardDataTree.applyRecoveryPayload(CommitTransactionPayload.create(nextTransactionId(),
487             DataTreeCandidates.fromNormalizedNode(YangInstanceIdentifier.of(), bigIntegerRoot()),
488             PayloadVersion.POTASSIUM));
489
490         assertCarsUint64();
491     }
492
493     @Test
494     public void testUintSnapshot() throws IOException, DataValidationFailedException {
495         shardDataTree.applyRecoverySnapshot(new ShardSnapshotState(new MetadataShardDataTreeSnapshot(bigIntegerRoot()),
496             true));
497
498         assertCarsUint64();
499     }
500
501     @Test
502     public void testUintReplay() throws DataValidationFailedException, IOException {
503         // Commit two writes and one merge, saving the data tree candidate for each.
504         //        write(foo=1)
505         //        write(foo=2)
506         //        merge(foo=3)
507         final DataTree dataTree = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
508             fullSchema);
509         DataTreeModification mod = dataTree.takeSnapshot().newModification();
510         mod.write(CarsModel.BASE_PATH, Builders.containerBuilder()
511                 .withNodeIdentifier(new NodeIdentifier(CarsModel.BASE_QNAME))
512                 .withChild(Builders.mapBuilder()
513                     .withNodeIdentifier(new NodeIdentifier(CarsModel.CAR_QNAME))
514                     .withChild(createCar("one", Uint64.ONE))
515                     .build())
516                 .build());
517         mod.ready();
518         dataTree.validate(mod);
519         final DataTreeCandidate first = dataTree.prepare(mod);
520         dataTree.commit(first);
521
522         mod = dataTree.takeSnapshot().newModification();
523         mod.write(CarsModel.newCarPath("two"), createCar("two", Uint64.TWO));
524         mod.ready();
525         dataTree.validate(mod);
526         final DataTreeCandidate second = dataTree.prepare(mod);
527         dataTree.commit(second);
528
529         mod = dataTree.takeSnapshot().newModification();
530         mod.merge(CarsModel.CAR_LIST_PATH, Builders.mapBuilder()
531             .withNodeIdentifier(new NodeIdentifier(CarsModel.CAR_QNAME))
532             .withChild(createCar("three", Uint64.TEN))
533             .build());
534         mod.ready();
535         dataTree.validate(mod);
536         final DataTreeCandidate third = dataTree.prepare(mod);
537         dataTree.commit(third);
538
539         // Apply first candidate as a snapshot
540         shardDataTree.applyRecoverySnapshot(new ShardSnapshotState(
541             new MetadataShardDataTreeSnapshot(first.getRootNode().getDataAfter()), true));
542         // Apply the other two snapshots as transactions
543         shardDataTree.applyRecoveryPayload(CommitTransactionPayload.create(nextTransactionId(), second,
544             PayloadVersion.POTASSIUM));
545         shardDataTree.applyRecoveryPayload(CommitTransactionPayload.create(nextTransactionId(), third,
546             PayloadVersion.POTASSIUM));
547
548         // Verify uint translation
549         final DataTreeSnapshot snapshot = shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
550
551         assertEquals(Builders.mapBuilder()
552             .withNodeIdentifier(new NodeIdentifier(CarsModel.CAR_QNAME))
553             // Note: Uint64
554             .withChild(createCar("one", Uint64.ONE))
555             .withChild(createCar("two", Uint64.TWO))
556             .withChild(createCar("three", Uint64.TEN))
557             .build(), snapshot.readNode(CarsModel.CAR_LIST_PATH).orElseThrow());
558     }
559
560     private void assertCarsUint64() {
561         final DataTreeSnapshot snapshot = shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
562         final NormalizedNode cars = snapshot.readNode(CarsModel.CAR_LIST_PATH).orElseThrow();
563
564         assertEquals(Builders.mapBuilder()
565             .withNodeIdentifier(new NodeIdentifier(CarsModel.CAR_QNAME))
566             // Note: Uint64
567             .withChild(createCar("foo", Uint64.ONE))
568             .build(), cars);
569     }
570
571     private static ContainerNode bigIntegerRoot() {
572         return Builders.containerBuilder()
573             .withNodeIdentifier(new NodeIdentifier(SchemaContext.NAME))
574             .withChild(Builders.containerBuilder()
575                 .withNodeIdentifier(new NodeIdentifier(CarsModel.CARS_QNAME))
576                 .withChild(Builders.mapBuilder()
577                     .withNodeIdentifier(new NodeIdentifier(CarsModel.CAR_QNAME))
578                     .withChild(createCar("foo", Uint64.ONE))
579                     .build())
580                 .build())
581             .build();
582     }
583
584     private static MapEntryNode createCar(final String name, final Object value) {
585         return Builders.mapEntryBuilder()
586             .withNodeIdentifier(NodeIdentifierWithPredicates.of(CarsModel.CAR_QNAME, CarsModel.CAR_NAME_QNAME, name))
587             .withChild(ImmutableNodes.leafNode(CarsModel.CAR_NAME_QNAME, name))
588             // Note: old BigInteger
589             .withChild(ImmutableNodes.leafNode(CarsModel.CAR_PRICE_QNAME, value))
590             .build();
591     }
592
593     private ShardDataTreeCohort newShardDataTreeCohort(final DataTreeOperation operation) {
594         final ReadWriteShardDataTreeTransaction transaction =
595                 shardDataTree.newReadWriteTransaction(nextTransactionId());
596         final DataTreeModification snapshot = transaction.getSnapshot();
597         operation.execute(snapshot);
598         return shardDataTree.finishTransaction(transaction, Optional.empty());
599     }
600
601     @SuppressWarnings({ "rawtypes", "unchecked" })
602     private static void verifyOnDataTreeChanged(final DOMDataTreeChangeListener listener,
603             final Consumer<DataTreeCandidate> callback) {
604         ArgumentCaptor<List> changes = ArgumentCaptor.forClass(List.class);
605         verify(listener, atLeastOnce()).onDataTreeChanged(changes.capture());
606         for (Collection list : changes.getAllValues()) {
607             for (Object dtc : list) {
608                 callback.accept((DataTreeCandidate)dtc);
609             }
610         }
611
612         reset(listener);
613     }
614
615     private static NormalizedNode getCars(final ShardDataTree shardDataTree) {
616         final ReadOnlyShardDataTreeTransaction readOnlyShardDataTreeTransaction =
617                 shardDataTree.newReadOnlyTransaction(nextTransactionId());
618         final DataTreeSnapshot snapshot1 = readOnlyShardDataTreeTransaction.getSnapshot();
619
620         final Optional<NormalizedNode> optional = snapshot1.readNode(CarsModel.BASE_PATH);
621
622         assertTrue(optional.isPresent());
623
624         return optional.orElseThrow();
625     }
626
627     private static DataTreeCandidate addCar(final ShardDataTree shardDataTree) {
628         return addCar(shardDataTree, "altima");
629     }
630
631     private static DataTreeCandidate addCar(final ShardDataTree shardDataTree, final String name) {
632         return doTransaction(shardDataTree, snapshot -> {
633             snapshot.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer());
634             snapshot.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
635             snapshot.write(CarsModel.newCarPath(name), CarsModel.newCarEntry(name, Uint64.valueOf(100)));
636         });
637     }
638
639     private static DataTreeCandidate removeCar(final ShardDataTree shardDataTree) {
640         return doTransaction(shardDataTree, snapshot -> snapshot.delete(CarsModel.newCarPath("altima")));
641     }
642
643     @FunctionalInterface
644     private interface DataTreeOperation {
645         void execute(DataTreeModification snapshot);
646     }
647
648     private static DataTreeCandidate doTransaction(final ShardDataTree shardDataTree,
649             final DataTreeOperation operation) {
650         final ReadWriteShardDataTreeTransaction transaction =
651                 shardDataTree.newReadWriteTransaction(nextTransactionId());
652         final DataTreeModification snapshot = transaction.getSnapshot();
653         operation.execute(snapshot);
654         final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction, Optional.empty());
655
656         immediateCanCommit(cohort);
657         immediatePreCommit(cohort);
658         final DataTreeCandidate candidate = cohort.getCandidate();
659         immediateCommit(cohort);
660
661         return candidate;
662     }
663
664     private static DataTreeCandidate applyCandidates(final ShardDataTree shardDataTree,
665             final List<DataTreeCandidate> candidates) {
666         final ReadWriteShardDataTreeTransaction transaction =
667                 shardDataTree.newReadWriteTransaction(nextTransactionId());
668         final DataTreeModification snapshot = transaction.getSnapshot();
669         for (final DataTreeCandidate candidateTip : candidates) {
670             DataTreeCandidates.applyToModification(snapshot, candidateTip);
671         }
672         final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction, Optional.empty());
673
674         immediateCanCommit(cohort);
675         immediatePreCommit(cohort);
676         final DataTreeCandidate candidate = cohort.getCandidate();
677         immediateCommit(cohort);
678
679         return candidate;
680     }
681 }