Fix followerDistributedDataStore tear down
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardDataTreeTest.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertTrue;
13 import static org.junit.Assert.fail;
14 import static org.mockito.ArgumentMatchers.any;
15 import static org.mockito.ArgumentMatchers.anyBoolean;
16 import static org.mockito.ArgumentMatchers.eq;
17 import static org.mockito.Mockito.atLeastOnce;
18 import static org.mockito.Mockito.doNothing;
19 import static org.mockito.Mockito.doReturn;
20 import static org.mockito.Mockito.inOrder;
21 import static org.mockito.Mockito.mock;
22 import static org.mockito.Mockito.never;
23 import static org.mockito.Mockito.reset;
24 import static org.mockito.Mockito.verify;
25 import static org.mockito.Mockito.verifyNoMoreInteractions;
26 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.coordinatedCanCommit;
27 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.coordinatedCommit;
28 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.coordinatedPreCommit;
29 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediate3PhaseCommit;
30 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCanCommit;
31 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCommit;
32 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediatePayloadReplication;
33 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediatePreCommit;
34
35 import com.google.common.base.Ticker;
36 import com.google.common.primitives.UnsignedLong;
37 import com.google.common.util.concurrent.FutureCallback;
38 import java.io.IOException;
39 import java.math.BigInteger;
40 import java.util.ArrayList;
41 import java.util.Collection;
42 import java.util.HashMap;
43 import java.util.List;
44 import java.util.Map;
45 import java.util.Optional;
46 import java.util.function.Consumer;
47 import org.junit.Before;
48 import org.junit.Test;
49 import org.mockito.ArgumentCaptor;
50 import org.mockito.InOrder;
51 import org.mockito.Mockito;
52 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
53 import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
54 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
55 import org.opendaylight.controller.cluster.datastore.persisted.PayloadVersion;
56 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
57 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
58 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
59 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
60 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
61 import org.opendaylight.yangtools.yang.common.Uint64;
62 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
63 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
64 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
65 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
66 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
67 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
68 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
69 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
70 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
71 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
72 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
73 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
74 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
75 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
76 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
77 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
78 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
79 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
80 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
81
82 public class ShardDataTreeTest extends AbstractTest {
83     private static final DatastoreContext DATASTORE_CONTEXT = DatastoreContext.newBuilder().build();
84
85     private final Shard mockShard = Mockito.mock(Shard.class);
86     private ShardDataTree shardDataTree;
87     private SchemaContext fullSchema;
88
89     @Before
90     public void setUp() {
91         doReturn(Ticker.systemTicker()).when(mockShard).ticker();
92         doReturn(mock(ShardStats.class)).when(mockShard).getShardMBean();
93         doReturn(DATASTORE_CONTEXT).when(mockShard).getDatastoreContext();
94
95         fullSchema = SchemaContextHelper.full();
96
97         shardDataTree = new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL);
98     }
99
100     @Test
101     public void testWrite() {
102         modify(false, true, true);
103     }
104
105     @Test
106     public void testMerge() {
107         modify(true, true, true);
108     }
109
110     private void modify(final boolean merge, final boolean expectedCarsPresent, final boolean expectedPeoplePresent) {
111         immediatePayloadReplication(shardDataTree, mockShard);
112
113         assertEquals(fullSchema, shardDataTree.getSchemaContext());
114
115         final ReadWriteShardDataTreeTransaction transaction =
116                 shardDataTree.newReadWriteTransaction(nextTransactionId());
117
118         final DataTreeModification snapshot = transaction.getSnapshot();
119
120         assertNotNull(snapshot);
121
122         if (merge) {
123             snapshot.merge(CarsModel.BASE_PATH, CarsModel.create());
124             snapshot.merge(PeopleModel.BASE_PATH, PeopleModel.create());
125         } else {
126             snapshot.write(CarsModel.BASE_PATH, CarsModel.create());
127             snapshot.write(PeopleModel.BASE_PATH, PeopleModel.create());
128         }
129
130         final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction, Optional.empty());
131
132         immediateCanCommit(cohort);
133         immediatePreCommit(cohort);
134         immediateCommit(cohort);
135
136         final ReadOnlyShardDataTreeTransaction readOnlyShardDataTreeTransaction =
137                 shardDataTree.newReadOnlyTransaction(nextTransactionId());
138
139         final DataTreeSnapshot snapshot1 = readOnlyShardDataTreeTransaction.getSnapshot();
140
141         final Optional<NormalizedNode<?, ?>> optional = snapshot1.readNode(CarsModel.BASE_PATH);
142
143         assertEquals(expectedCarsPresent, optional.isPresent());
144
145         final Optional<NormalizedNode<?, ?>> optional1 = snapshot1.readNode(PeopleModel.BASE_PATH);
146
147         assertEquals(expectedPeoplePresent, optional1.isPresent());
148     }
149
150     @Test
151     public void bug4359AddRemoveCarOnce() {
152         immediatePayloadReplication(shardDataTree, mockShard);
153
154         final List<DataTreeCandidate> candidates = new ArrayList<>();
155         candidates.add(addCar(shardDataTree));
156         candidates.add(removeCar(shardDataTree));
157
158         final NormalizedNode<?, ?> expected = getCars(shardDataTree);
159
160         applyCandidates(shardDataTree, candidates);
161
162         final NormalizedNode<?, ?> actual = getCars(shardDataTree);
163
164         assertEquals(expected, actual);
165     }
166
167     @Test
168     public void bug4359AddRemoveCarTwice() {
169         immediatePayloadReplication(shardDataTree, mockShard);
170
171         final List<DataTreeCandidate> candidates = new ArrayList<>();
172         candidates.add(addCar(shardDataTree));
173         candidates.add(removeCar(shardDataTree));
174         candidates.add(addCar(shardDataTree));
175         candidates.add(removeCar(shardDataTree));
176
177         final NormalizedNode<?, ?> expected = getCars(shardDataTree);
178
179         applyCandidates(shardDataTree, candidates);
180
181         final NormalizedNode<?, ?> actual = getCars(shardDataTree);
182
183         assertEquals(expected, actual);
184     }
185
186     @Test
187     public void testListenerNotifiedOnApplySnapshot() throws Exception {
188         immediatePayloadReplication(shardDataTree, mockShard);
189
190         DOMDataTreeChangeListener listener = mock(DOMDataTreeChangeListener.class);
191         shardDataTree.registerTreeChangeListener(CarsModel.CAR_LIST_PATH.node(CarsModel.CAR_QNAME), listener,
192             Optional.empty(), noop -> { });
193
194         addCar(shardDataTree, "optima");
195
196         verifyOnDataTreeChanged(listener, dtc -> {
197             assertEquals("getModificationType", ModificationType.WRITE, dtc.getRootNode().getModificationType());
198             assertEquals("getRootPath", CarsModel.newCarPath("optima"), dtc.getRootPath());
199         });
200
201         addCar(shardDataTree, "sportage");
202
203         verifyOnDataTreeChanged(listener, dtc -> {
204             assertEquals("getModificationType", ModificationType.WRITE, dtc.getRootNode().getModificationType());
205             assertEquals("getRootPath", CarsModel.newCarPath("sportage"), dtc.getRootPath());
206         });
207
208         ShardDataTree newDataTree = new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL);
209         immediatePayloadReplication(newDataTree, mockShard);
210         addCar(newDataTree, "optima");
211         addCar(newDataTree, "murano");
212
213         shardDataTree.applySnapshot(newDataTree.takeStateSnapshot());
214
215         Map<YangInstanceIdentifier, ModificationType> expChanges = new HashMap<>();
216         expChanges.put(CarsModel.newCarPath("optima"), ModificationType.WRITE);
217         expChanges.put(CarsModel.newCarPath("murano"), ModificationType.WRITE);
218         expChanges.put(CarsModel.newCarPath("sportage"), ModificationType.DELETE);
219         verifyOnDataTreeChanged(listener, dtc -> {
220             ModificationType expType = expChanges.remove(dtc.getRootPath());
221             assertNotNull("Got unexpected change for " + dtc.getRootPath(), expType);
222             assertEquals("getModificationType", expType, dtc.getRootNode().getModificationType());
223         });
224
225         if (!expChanges.isEmpty()) {
226             fail("Missing change notifications: " + expChanges);
227         }
228     }
229
230     @Test
231     public void testPipelinedTransactionsWithCoordinatedCommits() throws Exception {
232         final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
233             snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
234
235         final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
236             snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
237
238         NormalizedNode<?, ?> peopleNode = PeopleModel.create();
239         final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot ->
240             snapshot.write(PeopleModel.BASE_PATH, peopleNode));
241
242         YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
243         MapEntryNode carNode = CarsModel.newCarEntry("optima", Uint64.valueOf(100));
244         final ShardDataTreeCohort cohort4 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode));
245
246         immediateCanCommit(cohort1);
247         final FutureCallback<Void> canCommitCallback2 = coordinatedCanCommit(cohort2);
248         final FutureCallback<Void> canCommitCallback3 = coordinatedCanCommit(cohort3);
249         final FutureCallback<Void> canCommitCallback4 = coordinatedCanCommit(cohort4);
250
251         final FutureCallback<DataTreeCandidate> preCommitCallback1 = coordinatedPreCommit(cohort1);
252         verify(preCommitCallback1).onSuccess(cohort1.getCandidate());
253         verify(canCommitCallback2).onSuccess(null);
254
255         final FutureCallback<DataTreeCandidate> preCommitCallback2 = coordinatedPreCommit(cohort2);
256         verify(preCommitCallback2).onSuccess(cohort2.getCandidate());
257         verify(canCommitCallback3).onSuccess(null);
258
259         final FutureCallback<DataTreeCandidate> preCommitCallback3 = coordinatedPreCommit(cohort3);
260         verify(preCommitCallback3).onSuccess(cohort3.getCandidate());
261         verify(canCommitCallback4).onSuccess(null);
262
263         final FutureCallback<DataTreeCandidate> preCommitCallback4 = coordinatedPreCommit(cohort4);
264         verify(preCommitCallback4).onSuccess(cohort4.getCandidate());
265
266         final FutureCallback<UnsignedLong> commitCallback2 = coordinatedCommit(cohort2);
267         verify(mockShard, never()).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class),
268                 anyBoolean());
269         verifyNoMoreInteractions(commitCallback2);
270
271         final FutureCallback<UnsignedLong> commitCallback4 = coordinatedCommit(cohort4);
272         verify(mockShard, never()).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class),
273                 anyBoolean());
274         verifyNoMoreInteractions(commitCallback4);
275
276         final FutureCallback<UnsignedLong> commitCallback1 = coordinatedCommit(cohort1);
277         InOrder inOrder = inOrder(mockShard);
278         inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class),
279                 eq(true));
280         inOrder.verify(mockShard).persistPayload(eq(cohort2.getIdentifier()), any(CommitTransactionPayload.class),
281                 eq(false));
282         verifyNoMoreInteractions(commitCallback1);
283         verifyNoMoreInteractions(commitCallback2);
284
285         final FutureCallback<UnsignedLong> commitCallback3 = coordinatedCommit(cohort3);
286         inOrder = inOrder(mockShard);
287         inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class),
288                 eq(true));
289         inOrder.verify(mockShard).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class),
290                 eq(false));
291         verifyNoMoreInteractions(commitCallback3);
292         verifyNoMoreInteractions(commitCallback4);
293
294         final ShardDataTreeCohort cohort5 = newShardDataTreeCohort(snapshot ->
295             snapshot.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
296         final FutureCallback<Void> canCommitCallback5 = coordinatedCanCommit(cohort5);
297
298         // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
299         CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(),
300                 cohort1.getCandidate());
301         shardDataTree.applyReplicatedPayload(cohort1.getIdentifier(), mockPayload);
302         shardDataTree.applyReplicatedPayload(cohort2.getIdentifier(), mockPayload);
303         shardDataTree.applyReplicatedPayload(cohort3.getIdentifier(), mockPayload);
304         shardDataTree.applyReplicatedPayload(cohort4.getIdentifier(), mockPayload);
305
306         inOrder = inOrder(commitCallback1, commitCallback2, commitCallback3, commitCallback4);
307         inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class));
308         inOrder.verify(commitCallback2).onSuccess(any(UnsignedLong.class));
309         inOrder.verify(commitCallback3).onSuccess(any(UnsignedLong.class));
310         inOrder.verify(commitCallback4).onSuccess(any(UnsignedLong.class));
311
312         verify(canCommitCallback5).onSuccess(null);
313
314         final DataTreeSnapshot snapshot =
315                 shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
316         Optional<NormalizedNode<?, ?>> optional = snapshot.readNode(carPath);
317         assertTrue("Car node present", optional.isPresent());
318         assertEquals("Car node", carNode, optional.get());
319
320         optional = snapshot.readNode(PeopleModel.BASE_PATH);
321         assertTrue("People node present", optional.isPresent());
322         assertEquals("People node", peopleNode, optional.get());
323     }
324
325     @Test
326     public void testPipelinedTransactionsWithImmediateCommits() throws Exception {
327         final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
328             snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
329
330         final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
331             snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
332
333         YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
334         MapEntryNode carNode = CarsModel.newCarEntry("optima", Uint64.valueOf(100));
335         final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode));
336
337         final FutureCallback<UnsignedLong> commitCallback2 = immediate3PhaseCommit(cohort2);
338         final FutureCallback<UnsignedLong> commitCallback3 = immediate3PhaseCommit(cohort3);
339         final FutureCallback<UnsignedLong> commitCallback1 = immediate3PhaseCommit(cohort1);
340
341         InOrder inOrder = inOrder(mockShard);
342         inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class),
343                 eq(true));
344         inOrder.verify(mockShard).persistPayload(eq(cohort2.getIdentifier()), any(CommitTransactionPayload.class),
345                 eq(true));
346         inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class),
347                 eq(false));
348
349         // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
350         CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(),
351                 cohort1.getCandidate());
352         shardDataTree.applyReplicatedPayload(cohort1.getIdentifier(), mockPayload);
353         shardDataTree.applyReplicatedPayload(cohort2.getIdentifier(), mockPayload);
354         shardDataTree.applyReplicatedPayload(cohort3.getIdentifier(), mockPayload);
355
356         inOrder = inOrder(commitCallback1, commitCallback2, commitCallback3);
357         inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class));
358         inOrder.verify(commitCallback2).onSuccess(any(UnsignedLong.class));
359         inOrder.verify(commitCallback3).onSuccess(any(UnsignedLong.class));
360
361         final DataTreeSnapshot snapshot =
362                 shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
363         Optional<NormalizedNode<?, ?>> optional = snapshot.readNode(carPath);
364         assertTrue("Car node present", optional.isPresent());
365         assertEquals("Car node", carNode, optional.get());
366     }
367
368     @Test
369     public void testPipelinedTransactionsWithImmediateReplication() {
370         immediatePayloadReplication(shardDataTree, mockShard);
371
372         final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
373             snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
374
375         final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
376             snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
377
378         YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
379         MapEntryNode carNode = CarsModel.newCarEntry("optima", Uint64.valueOf(100));
380         final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode));
381
382         final FutureCallback<UnsignedLong> commitCallback1 = immediate3PhaseCommit(cohort1);
383         final FutureCallback<UnsignedLong> commitCallback2 = immediate3PhaseCommit(cohort2);
384         final FutureCallback<UnsignedLong> commitCallback3 = immediate3PhaseCommit(cohort3);
385
386         InOrder inOrder = inOrder(commitCallback1, commitCallback2, commitCallback3);
387         inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class));
388         inOrder.verify(commitCallback2).onSuccess(any(UnsignedLong.class));
389         inOrder.verify(commitCallback3).onSuccess(any(UnsignedLong.class));
390
391         final DataTreeSnapshot snapshot = shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
392         Optional<NormalizedNode<?, ?>> optional = snapshot.readNode(CarsModel.BASE_PATH);
393         assertTrue("Car node present", optional.isPresent());
394     }
395
396     @SuppressWarnings("unchecked")
397     @Test
398     public void testAbortWithPendingCommits() throws Exception {
399         final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
400             snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
401
402         final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
403             snapshot.write(PeopleModel.BASE_PATH, PeopleModel.create()));
404
405         final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot ->
406             snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
407
408         YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
409         MapEntryNode carNode = CarsModel.newCarEntry("optima", Uint64.valueOf(100));
410         final ShardDataTreeCohort cohort4 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode));
411
412         coordinatedCanCommit(cohort2);
413         immediateCanCommit(cohort1);
414         coordinatedCanCommit(cohort3);
415         coordinatedCanCommit(cohort4);
416
417         coordinatedPreCommit(cohort1);
418         coordinatedPreCommit(cohort2);
419         coordinatedPreCommit(cohort3);
420
421         FutureCallback<Void> mockAbortCallback = mock(FutureCallback.class);
422         doNothing().when(mockAbortCallback).onSuccess(null);
423         cohort2.abort(mockAbortCallback);
424         verify(mockAbortCallback).onSuccess(null);
425
426         coordinatedPreCommit(cohort4);
427         coordinatedCommit(cohort1);
428         coordinatedCommit(cohort3);
429         coordinatedCommit(cohort4);
430
431         InOrder inOrder = inOrder(mockShard);
432         inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class),
433                 eq(false));
434         inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class),
435                 eq(false));
436         inOrder.verify(mockShard).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class),
437                 eq(false));
438
439         // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
440         CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(),
441                 cohort1.getCandidate());
442         shardDataTree.applyReplicatedPayload(cohort1.getIdentifier(), mockPayload);
443         shardDataTree.applyReplicatedPayload(cohort3.getIdentifier(), mockPayload);
444         shardDataTree.applyReplicatedPayload(cohort4.getIdentifier(), mockPayload);
445
446         final DataTreeSnapshot snapshot =
447                 shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
448         Optional<NormalizedNode<?, ?>> optional = snapshot.readNode(carPath);
449         assertTrue("Car node present", optional.isPresent());
450         assertEquals("Car node", carNode, optional.get());
451     }
452
453     @SuppressWarnings("unchecked")
454     @Test
455     public void testAbortWithFailedRebase() {
456         immediatePayloadReplication(shardDataTree, mockShard);
457
458         final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
459             snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
460
461         final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
462             snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
463
464         NormalizedNode<?, ?> peopleNode = PeopleModel.create();
465         final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot ->
466             snapshot.write(PeopleModel.BASE_PATH, peopleNode));
467
468         immediateCanCommit(cohort1);
469         FutureCallback<Void> canCommitCallback2 = coordinatedCanCommit(cohort2);
470
471         coordinatedPreCommit(cohort1);
472         verify(canCommitCallback2).onSuccess(null);
473
474         FutureCallback<Void> mockAbortCallback = mock(FutureCallback.class);
475         doNothing().when(mockAbortCallback).onSuccess(null);
476         cohort1.abort(mockAbortCallback);
477         verify(mockAbortCallback).onSuccess(null);
478
479         FutureCallback<DataTreeCandidate> preCommitCallback2 = coordinatedPreCommit(cohort2);
480         verify(preCommitCallback2).onFailure(any(Throwable.class));
481
482         immediateCanCommit(cohort3);
483         immediatePreCommit(cohort3);
484         immediateCommit(cohort3);
485
486         final DataTreeSnapshot snapshot =
487                 shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
488         Optional<NormalizedNode<?, ?>> optional = snapshot.readNode(PeopleModel.BASE_PATH);
489         assertTrue("People node present", optional.isPresent());
490         assertEquals("People node", peopleNode, optional.get());
491     }
492
493     @Test
494     public void testUintCommitPayload() throws IOException {
495         shardDataTree.applyRecoveryPayload(CommitTransactionPayload.create(nextTransactionId(),
496             DataTreeCandidates.fromNormalizedNode(YangInstanceIdentifier.empty(), bigIntegerRoot()),
497             PayloadVersion.SODIUM_SR1));
498
499         assertCarsUint64();
500     }
501
502     @Test
503     public void testUintSnapshot() throws IOException, DataValidationFailedException {
504         shardDataTree.applyRecoverySnapshot(new ShardSnapshotState(new MetadataShardDataTreeSnapshot(bigIntegerRoot()),
505             true));
506
507         assertCarsUint64();
508     }
509
510     @Test
511     public void testUintReplay() throws DataValidationFailedException, IOException {
512         // Commit two writes and one merge, saving the data tree candidate for each.
513         //        write(foo=1)
514         //        write(foo=2)
515         //        merge(foo=3)
516         final DataTree dataTree = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
517             fullSchema);
518         DataTreeModification mod = dataTree.takeSnapshot().newModification();
519         mod.write(CarsModel.BASE_PATH, Builders.containerBuilder()
520                 .withNodeIdentifier(new NodeIdentifier(CarsModel.BASE_QNAME))
521                 .withChild(Builders.mapBuilder()
522                     .withNodeIdentifier(new NodeIdentifier(CarsModel.CAR_QNAME))
523                     .withChild(createCar("one", BigInteger.ONE))
524                     .build())
525                 .build());
526         mod.ready();
527         dataTree.validate(mod);
528         final DataTreeCandidate first = dataTree.prepare(mod);
529         dataTree.commit(first);
530
531         mod = dataTree.takeSnapshot().newModification();
532         mod.write(CarsModel.newCarPath("two"), createCar("two", BigInteger.TWO));
533         mod.ready();
534         dataTree.validate(mod);
535         final DataTreeCandidate second = dataTree.prepare(mod);
536         dataTree.commit(second);
537
538         mod = dataTree.takeSnapshot().newModification();
539         mod.merge(CarsModel.CAR_LIST_PATH, Builders.mapBuilder()
540             .withNodeIdentifier(new NodeIdentifier(CarsModel.CAR_QNAME))
541             .withChild(createCar("three", BigInteger.TEN))
542             .build());
543         mod.ready();
544         dataTree.validate(mod);
545         final DataTreeCandidate third = dataTree.prepare(mod);
546         dataTree.commit(third);
547
548         // Apply first candidate as a snapshot
549         shardDataTree.applyRecoverySnapshot(
550             new ShardSnapshotState(new MetadataShardDataTreeSnapshot(first.getRootNode().getDataAfter().get()), true));
551         // Apply the other two snapshots as transactions
552         shardDataTree.applyRecoveryPayload(CommitTransactionPayload.create(nextTransactionId(), second,
553             PayloadVersion.SODIUM_SR1));
554         shardDataTree.applyRecoveryPayload(CommitTransactionPayload.create(nextTransactionId(), third,
555             PayloadVersion.SODIUM_SR1));
556
557         // Verify uint translation
558         final DataTreeSnapshot snapshot = shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
559         final NormalizedNode<?, ?> cars = snapshot.readNode(CarsModel.CAR_LIST_PATH).get();
560
561         assertEquals(Builders.mapBuilder()
562             .withNodeIdentifier(new NodeIdentifier(CarsModel.CAR_QNAME))
563             // Note: Uint64
564             .withChild(createCar("one", Uint64.ONE))
565             .withChild(createCar("two", Uint64.TWO))
566             .withChild(createCar("three", Uint64.TEN))
567             .build(), cars);
568     }
569
570     private void assertCarsUint64() {
571         final DataTreeSnapshot snapshot = shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
572         final NormalizedNode<?, ?> cars = snapshot.readNode(CarsModel.CAR_LIST_PATH).get();
573
574         assertEquals(Builders.mapBuilder()
575             .withNodeIdentifier(new NodeIdentifier(CarsModel.CAR_QNAME))
576             // Note: Uint64
577             .withChild(createCar("foo", Uint64.ONE))
578             .build(), cars);
579     }
580
581     private static ContainerNode bigIntegerRoot() {
582         return Builders.containerBuilder()
583                 .withNodeIdentifier(new NodeIdentifier(SchemaContext.NAME))
584                 .withChild(Builders.containerBuilder()
585                     .withNodeIdentifier(new NodeIdentifier(CarsModel.CARS_QNAME))
586                     .withChild(Builders.mapBuilder()
587                         .withNodeIdentifier(new NodeIdentifier(CarsModel.CAR_QNAME))
588                         // Note: BigInteger
589                         .withChild(createCar("foo", BigInteger.ONE))
590                         .build())
591                     .build())
592                 .build();
593     }
594
595     private static MapEntryNode createCar(final String name, final Object value) {
596         return Builders.mapEntryBuilder()
597                 .withNodeIdentifier(NodeIdentifierWithPredicates.of(CarsModel.CAR_QNAME,CarsModel.CAR_NAME_QNAME, name))
598                 .withChild(ImmutableNodes.leafNode(CarsModel.CAR_NAME_QNAME, name))
599                 // Note: old BigInteger
600                 .withChild(ImmutableNodes.leafNode(CarsModel.CAR_PRICE_QNAME, value))
601                 .build();
602     }
603
604     private ShardDataTreeCohort newShardDataTreeCohort(final DataTreeOperation operation) {
605         final ReadWriteShardDataTreeTransaction transaction =
606                 shardDataTree.newReadWriteTransaction(nextTransactionId());
607         final DataTreeModification snapshot = transaction.getSnapshot();
608         operation.execute(snapshot);
609         return shardDataTree.finishTransaction(transaction, Optional.empty());
610     }
611
612     @SuppressWarnings({ "rawtypes", "unchecked" })
613     private static void verifyOnDataTreeChanged(final DOMDataTreeChangeListener listener,
614             final Consumer<DataTreeCandidate> callback) {
615         ArgumentCaptor<Collection> changes = ArgumentCaptor.forClass(Collection.class);
616         verify(listener, atLeastOnce()).onDataTreeChanged(changes.capture());
617         for (Collection list : changes.getAllValues()) {
618             for (Object dtc : list) {
619                 callback.accept((DataTreeCandidate)dtc);
620             }
621         }
622
623         reset(listener);
624     }
625
626     private static NormalizedNode<?, ?> getCars(final ShardDataTree shardDataTree) {
627         final ReadOnlyShardDataTreeTransaction readOnlyShardDataTreeTransaction =
628                 shardDataTree.newReadOnlyTransaction(nextTransactionId());
629         final DataTreeSnapshot snapshot1 = readOnlyShardDataTreeTransaction.getSnapshot();
630
631         final Optional<NormalizedNode<?, ?>> optional = snapshot1.readNode(CarsModel.BASE_PATH);
632
633         assertTrue(optional.isPresent());
634
635         return optional.get();
636     }
637
638     private static DataTreeCandidate addCar(final ShardDataTree shardDataTree) {
639         return addCar(shardDataTree, "altima");
640     }
641
642     private static DataTreeCandidate addCar(final ShardDataTree shardDataTree, final String name) {
643         return doTransaction(shardDataTree, snapshot -> {
644             snapshot.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer());
645             snapshot.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
646             snapshot.write(CarsModel.newCarPath(name), CarsModel.newCarEntry(name, Uint64.valueOf(100)));
647         });
648     }
649
650     private static DataTreeCandidate removeCar(final ShardDataTree shardDataTree) {
651         return doTransaction(shardDataTree, snapshot -> snapshot.delete(CarsModel.newCarPath("altima")));
652     }
653
654     @FunctionalInterface
655     private interface DataTreeOperation {
656         void execute(DataTreeModification snapshot);
657     }
658
659     private static DataTreeCandidate doTransaction(final ShardDataTree shardDataTree,
660             final DataTreeOperation operation) {
661         final ReadWriteShardDataTreeTransaction transaction =
662                 shardDataTree.newReadWriteTransaction(nextTransactionId());
663         final DataTreeModification snapshot = transaction.getSnapshot();
664         operation.execute(snapshot);
665         final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction, Optional.empty());
666
667         immediateCanCommit(cohort);
668         immediatePreCommit(cohort);
669         final DataTreeCandidate candidate = cohort.getCandidate();
670         immediateCommit(cohort);
671
672         return candidate;
673     }
674
675     private static DataTreeCandidate applyCandidates(final ShardDataTree shardDataTree,
676             final List<DataTreeCandidate> candidates) {
677         final ReadWriteShardDataTreeTransaction transaction =
678                 shardDataTree.newReadWriteTransaction(nextTransactionId());
679         final DataTreeModification snapshot = transaction.getSnapshot();
680         for (final DataTreeCandidate candidateTip : candidates) {
681             DataTreeCandidates.applyToModification(snapshot, candidateTip);
682         }
683         final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction, Optional.empty());
684
685         immediateCanCommit(cohort);
686         immediatePreCommit(cohort);
687         final DataTreeCandidate candidate = cohort.getCandidate();
688         immediateCommit(cohort);
689
690         return candidate;
691     }
692 }