Translate uint values for old streams
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardDataTreeTest.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertTrue;
13 import static org.junit.Assert.fail;
14 import static org.mockito.ArgumentMatchers.any;
15 import static org.mockito.ArgumentMatchers.anyBoolean;
16 import static org.mockito.ArgumentMatchers.eq;
17 import static org.mockito.Mockito.atLeastOnce;
18 import static org.mockito.Mockito.doNothing;
19 import static org.mockito.Mockito.doReturn;
20 import static org.mockito.Mockito.inOrder;
21 import static org.mockito.Mockito.mock;
22 import static org.mockito.Mockito.never;
23 import static org.mockito.Mockito.reset;
24 import static org.mockito.Mockito.verify;
25 import static org.mockito.Mockito.verifyNoMoreInteractions;
26 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.coordinatedCanCommit;
27 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.coordinatedCommit;
28 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.coordinatedPreCommit;
29 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediate3PhaseCommit;
30 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCanCommit;
31 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCommit;
32 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediatePayloadReplication;
33 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediatePreCommit;
34
35 import com.google.common.base.Ticker;
36 import com.google.common.primitives.UnsignedLong;
37 import com.google.common.util.concurrent.FutureCallback;
38 import java.io.IOException;
39 import java.math.BigInteger;
40 import java.util.ArrayList;
41 import java.util.Collection;
42 import java.util.HashMap;
43 import java.util.List;
44 import java.util.Map;
45 import java.util.Optional;
46 import java.util.function.Consumer;
47 import org.junit.Before;
48 import org.junit.Test;
49 import org.mockito.ArgumentCaptor;
50 import org.mockito.InOrder;
51 import org.mockito.Mockito;
52 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
53 import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
54 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
55 import org.opendaylight.controller.cluster.datastore.persisted.PayloadVersion;
56 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
57 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
58 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
59 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
60 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
61 import org.opendaylight.yangtools.yang.common.Uint64;
62 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
63 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
64 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
65 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
66 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
67 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
68 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
69 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
70 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
71 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
72 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
73 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
74 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
75 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
76 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
77 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
78
79 public class ShardDataTreeTest extends AbstractTest {
80     private static final DatastoreContext DATASTORE_CONTEXT = DatastoreContext.newBuilder().build();
81
82     private final Shard mockShard = Mockito.mock(Shard.class);
83     private ShardDataTree shardDataTree;
84     private SchemaContext fullSchema;
85
86     @Before
87     public void setUp() {
88         doReturn(Ticker.systemTicker()).when(mockShard).ticker();
89         doReturn(mock(ShardStats.class)).when(mockShard).getShardMBean();
90         doReturn(DATASTORE_CONTEXT).when(mockShard).getDatastoreContext();
91
92         fullSchema = SchemaContextHelper.full();
93
94         shardDataTree = new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL);
95     }
96
97     @Test
98     public void testWrite() {
99         modify(false, true, true);
100     }
101
102     @Test
103     public void testMerge() {
104         modify(true, true, true);
105     }
106
107     private void modify(final boolean merge, final boolean expectedCarsPresent, final boolean expectedPeoplePresent) {
108         immediatePayloadReplication(shardDataTree, mockShard);
109
110         assertEquals(fullSchema, shardDataTree.getSchemaContext());
111
112         final ReadWriteShardDataTreeTransaction transaction =
113                 shardDataTree.newReadWriteTransaction(nextTransactionId());
114
115         final DataTreeModification snapshot = transaction.getSnapshot();
116
117         assertNotNull(snapshot);
118
119         if (merge) {
120             snapshot.merge(CarsModel.BASE_PATH, CarsModel.create());
121             snapshot.merge(PeopleModel.BASE_PATH, PeopleModel.create());
122         } else {
123             snapshot.write(CarsModel.BASE_PATH, CarsModel.create());
124             snapshot.write(PeopleModel.BASE_PATH, PeopleModel.create());
125         }
126
127         final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction, Optional.empty());
128
129         immediateCanCommit(cohort);
130         immediatePreCommit(cohort);
131         immediateCommit(cohort);
132
133         final ReadOnlyShardDataTreeTransaction readOnlyShardDataTreeTransaction =
134                 shardDataTree.newReadOnlyTransaction(nextTransactionId());
135
136         final DataTreeSnapshot snapshot1 = readOnlyShardDataTreeTransaction.getSnapshot();
137
138         final Optional<NormalizedNode<?, ?>> optional = snapshot1.readNode(CarsModel.BASE_PATH);
139
140         assertEquals(expectedCarsPresent, optional.isPresent());
141
142         final Optional<NormalizedNode<?, ?>> optional1 = snapshot1.readNode(PeopleModel.BASE_PATH);
143
144         assertEquals(expectedPeoplePresent, optional1.isPresent());
145     }
146
147     @Test
148     public void bug4359AddRemoveCarOnce() {
149         immediatePayloadReplication(shardDataTree, mockShard);
150
151         final List<DataTreeCandidate> candidates = new ArrayList<>();
152         candidates.add(addCar(shardDataTree));
153         candidates.add(removeCar(shardDataTree));
154
155         final NormalizedNode<?, ?> expected = getCars(shardDataTree);
156
157         applyCandidates(shardDataTree, candidates);
158
159         final NormalizedNode<?, ?> actual = getCars(shardDataTree);
160
161         assertEquals(expected, actual);
162     }
163
164     @Test
165     public void bug4359AddRemoveCarTwice() {
166         immediatePayloadReplication(shardDataTree, mockShard);
167
168         final List<DataTreeCandidate> candidates = new ArrayList<>();
169         candidates.add(addCar(shardDataTree));
170         candidates.add(removeCar(shardDataTree));
171         candidates.add(addCar(shardDataTree));
172         candidates.add(removeCar(shardDataTree));
173
174         final NormalizedNode<?, ?> expected = getCars(shardDataTree);
175
176         applyCandidates(shardDataTree, candidates);
177
178         final NormalizedNode<?, ?> actual = getCars(shardDataTree);
179
180         assertEquals(expected, actual);
181     }
182
183     @Test
184     public void testListenerNotifiedOnApplySnapshot() throws Exception {
185         immediatePayloadReplication(shardDataTree, mockShard);
186
187         DOMDataTreeChangeListener listener = mock(DOMDataTreeChangeListener.class);
188         shardDataTree.registerTreeChangeListener(CarsModel.CAR_LIST_PATH.node(CarsModel.CAR_QNAME), listener,
189             Optional.empty(), noop -> { });
190
191         addCar(shardDataTree, "optima");
192
193         verifyOnDataTreeChanged(listener, dtc -> {
194             assertEquals("getModificationType", ModificationType.WRITE, dtc.getRootNode().getModificationType());
195             assertEquals("getRootPath", CarsModel.newCarPath("optima"), dtc.getRootPath());
196         });
197
198         addCar(shardDataTree, "sportage");
199
200         verifyOnDataTreeChanged(listener, dtc -> {
201             assertEquals("getModificationType", ModificationType.WRITE, dtc.getRootNode().getModificationType());
202             assertEquals("getRootPath", CarsModel.newCarPath("sportage"), dtc.getRootPath());
203         });
204
205         ShardDataTree newDataTree = new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL);
206         immediatePayloadReplication(newDataTree, mockShard);
207         addCar(newDataTree, "optima");
208         addCar(newDataTree, "murano");
209
210         shardDataTree.applySnapshot(newDataTree.takeStateSnapshot());
211
212         Map<YangInstanceIdentifier, ModificationType> expChanges = new HashMap<>();
213         expChanges.put(CarsModel.newCarPath("optima"), ModificationType.WRITE);
214         expChanges.put(CarsModel.newCarPath("murano"), ModificationType.WRITE);
215         expChanges.put(CarsModel.newCarPath("sportage"), ModificationType.DELETE);
216         verifyOnDataTreeChanged(listener, dtc -> {
217             ModificationType expType = expChanges.remove(dtc.getRootPath());
218             assertNotNull("Got unexpected change for " + dtc.getRootPath(), expType);
219             assertEquals("getModificationType", expType, dtc.getRootNode().getModificationType());
220         });
221
222         if (!expChanges.isEmpty()) {
223             fail("Missing change notifications: " + expChanges);
224         }
225     }
226
227     @Test
228     public void testPipelinedTransactionsWithCoordinatedCommits() throws Exception {
229         final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
230             snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
231
232         final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
233             snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
234
235         NormalizedNode<?, ?> peopleNode = PeopleModel.create();
236         final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot ->
237             snapshot.write(PeopleModel.BASE_PATH, peopleNode));
238
239         YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
240         MapEntryNode carNode = CarsModel.newCarEntry("optima", Uint64.valueOf(100));
241         final ShardDataTreeCohort cohort4 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode));
242
243         immediateCanCommit(cohort1);
244         final FutureCallback<Void> canCommitCallback2 = coordinatedCanCommit(cohort2);
245         final FutureCallback<Void> canCommitCallback3 = coordinatedCanCommit(cohort3);
246         final FutureCallback<Void> canCommitCallback4 = coordinatedCanCommit(cohort4);
247
248         final FutureCallback<DataTreeCandidate> preCommitCallback1 = coordinatedPreCommit(cohort1);
249         verify(preCommitCallback1).onSuccess(cohort1.getCandidate());
250         verify(canCommitCallback2).onSuccess(null);
251
252         final FutureCallback<DataTreeCandidate> preCommitCallback2 = coordinatedPreCommit(cohort2);
253         verify(preCommitCallback2).onSuccess(cohort2.getCandidate());
254         verify(canCommitCallback3).onSuccess(null);
255
256         final FutureCallback<DataTreeCandidate> preCommitCallback3 = coordinatedPreCommit(cohort3);
257         verify(preCommitCallback3).onSuccess(cohort3.getCandidate());
258         verify(canCommitCallback4).onSuccess(null);
259
260         final FutureCallback<DataTreeCandidate> preCommitCallback4 = coordinatedPreCommit(cohort4);
261         verify(preCommitCallback4).onSuccess(cohort4.getCandidate());
262
263         final FutureCallback<UnsignedLong> commitCallback2 = coordinatedCommit(cohort2);
264         verify(mockShard, never()).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class),
265                 anyBoolean());
266         verifyNoMoreInteractions(commitCallback2);
267
268         final FutureCallback<UnsignedLong> commitCallback4 = coordinatedCommit(cohort4);
269         verify(mockShard, never()).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class),
270                 anyBoolean());
271         verifyNoMoreInteractions(commitCallback4);
272
273         final FutureCallback<UnsignedLong> commitCallback1 = coordinatedCommit(cohort1);
274         InOrder inOrder = inOrder(mockShard);
275         inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class),
276                 eq(true));
277         inOrder.verify(mockShard).persistPayload(eq(cohort2.getIdentifier()), any(CommitTransactionPayload.class),
278                 eq(false));
279         verifyNoMoreInteractions(commitCallback1);
280         verifyNoMoreInteractions(commitCallback2);
281
282         final FutureCallback<UnsignedLong> commitCallback3 = coordinatedCommit(cohort3);
283         inOrder = inOrder(mockShard);
284         inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class),
285                 eq(true));
286         inOrder.verify(mockShard).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class),
287                 eq(false));
288         verifyNoMoreInteractions(commitCallback3);
289         verifyNoMoreInteractions(commitCallback4);
290
291         final ShardDataTreeCohort cohort5 = newShardDataTreeCohort(snapshot ->
292             snapshot.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
293         final FutureCallback<Void> canCommitCallback5 = coordinatedCanCommit(cohort5);
294
295         // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
296         CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(),
297                 cohort1.getCandidate());
298         shardDataTree.applyReplicatedPayload(cohort1.getIdentifier(), mockPayload);
299         shardDataTree.applyReplicatedPayload(cohort2.getIdentifier(), mockPayload);
300         shardDataTree.applyReplicatedPayload(cohort3.getIdentifier(), mockPayload);
301         shardDataTree.applyReplicatedPayload(cohort4.getIdentifier(), mockPayload);
302
303         inOrder = inOrder(commitCallback1, commitCallback2, commitCallback3, commitCallback4);
304         inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class));
305         inOrder.verify(commitCallback2).onSuccess(any(UnsignedLong.class));
306         inOrder.verify(commitCallback3).onSuccess(any(UnsignedLong.class));
307         inOrder.verify(commitCallback4).onSuccess(any(UnsignedLong.class));
308
309         verify(canCommitCallback5).onSuccess(null);
310
311         final DataTreeSnapshot snapshot =
312                 shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
313         Optional<NormalizedNode<?, ?>> optional = snapshot.readNode(carPath);
314         assertTrue("Car node present", optional.isPresent());
315         assertEquals("Car node", carNode, optional.get());
316
317         optional = snapshot.readNode(PeopleModel.BASE_PATH);
318         assertTrue("People node present", optional.isPresent());
319         assertEquals("People node", peopleNode, optional.get());
320     }
321
322     @Test
323     public void testPipelinedTransactionsWithImmediateCommits() throws Exception {
324         final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
325             snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
326
327         final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
328             snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
329
330         YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
331         MapEntryNode carNode = CarsModel.newCarEntry("optima", Uint64.valueOf(100));
332         final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode));
333
334         final FutureCallback<UnsignedLong> commitCallback2 = immediate3PhaseCommit(cohort2);
335         final FutureCallback<UnsignedLong> commitCallback3 = immediate3PhaseCommit(cohort3);
336         final FutureCallback<UnsignedLong> commitCallback1 = immediate3PhaseCommit(cohort1);
337
338         InOrder inOrder = inOrder(mockShard);
339         inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class),
340                 eq(true));
341         inOrder.verify(mockShard).persistPayload(eq(cohort2.getIdentifier()), any(CommitTransactionPayload.class),
342                 eq(true));
343         inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class),
344                 eq(false));
345
346         // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
347         CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(),
348                 cohort1.getCandidate());
349         shardDataTree.applyReplicatedPayload(cohort1.getIdentifier(), mockPayload);
350         shardDataTree.applyReplicatedPayload(cohort2.getIdentifier(), mockPayload);
351         shardDataTree.applyReplicatedPayload(cohort3.getIdentifier(), mockPayload);
352
353         inOrder = inOrder(commitCallback1, commitCallback2, commitCallback3);
354         inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class));
355         inOrder.verify(commitCallback2).onSuccess(any(UnsignedLong.class));
356         inOrder.verify(commitCallback3).onSuccess(any(UnsignedLong.class));
357
358         final DataTreeSnapshot snapshot =
359                 shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
360         Optional<NormalizedNode<?, ?>> optional = snapshot.readNode(carPath);
361         assertTrue("Car node present", optional.isPresent());
362         assertEquals("Car node", carNode, optional.get());
363     }
364
365     @Test
366     public void testPipelinedTransactionsWithImmediateReplication() {
367         immediatePayloadReplication(shardDataTree, mockShard);
368
369         final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
370             snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
371
372         final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
373             snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
374
375         YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
376         MapEntryNode carNode = CarsModel.newCarEntry("optima", Uint64.valueOf(100));
377         final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode));
378
379         final FutureCallback<UnsignedLong> commitCallback1 = immediate3PhaseCommit(cohort1);
380         final FutureCallback<UnsignedLong> commitCallback2 = immediate3PhaseCommit(cohort2);
381         final FutureCallback<UnsignedLong> commitCallback3 = immediate3PhaseCommit(cohort3);
382
383         InOrder inOrder = inOrder(commitCallback1, commitCallback2, commitCallback3);
384         inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class));
385         inOrder.verify(commitCallback2).onSuccess(any(UnsignedLong.class));
386         inOrder.verify(commitCallback3).onSuccess(any(UnsignedLong.class));
387
388         final DataTreeSnapshot snapshot = shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
389         Optional<NormalizedNode<?, ?>> optional = snapshot.readNode(CarsModel.BASE_PATH);
390         assertTrue("Car node present", optional.isPresent());
391     }
392
393     @SuppressWarnings("unchecked")
394     @Test
395     public void testAbortWithPendingCommits() throws Exception {
396         final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
397             snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
398
399         final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
400             snapshot.write(PeopleModel.BASE_PATH, PeopleModel.create()));
401
402         final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot ->
403             snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
404
405         YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
406         MapEntryNode carNode = CarsModel.newCarEntry("optima", Uint64.valueOf(100));
407         final ShardDataTreeCohort cohort4 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode));
408
409         coordinatedCanCommit(cohort2);
410         immediateCanCommit(cohort1);
411         coordinatedCanCommit(cohort3);
412         coordinatedCanCommit(cohort4);
413
414         coordinatedPreCommit(cohort1);
415         coordinatedPreCommit(cohort2);
416         coordinatedPreCommit(cohort3);
417
418         FutureCallback<Void> mockAbortCallback = mock(FutureCallback.class);
419         doNothing().when(mockAbortCallback).onSuccess(null);
420         cohort2.abort(mockAbortCallback);
421         verify(mockAbortCallback).onSuccess(null);
422
423         coordinatedPreCommit(cohort4);
424         coordinatedCommit(cohort1);
425         coordinatedCommit(cohort3);
426         coordinatedCommit(cohort4);
427
428         InOrder inOrder = inOrder(mockShard);
429         inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class),
430                 eq(false));
431         inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class),
432                 eq(false));
433         inOrder.verify(mockShard).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class),
434                 eq(false));
435
436         // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
437         CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(),
438                 cohort1.getCandidate());
439         shardDataTree.applyReplicatedPayload(cohort1.getIdentifier(), mockPayload);
440         shardDataTree.applyReplicatedPayload(cohort3.getIdentifier(), mockPayload);
441         shardDataTree.applyReplicatedPayload(cohort4.getIdentifier(), mockPayload);
442
443         final DataTreeSnapshot snapshot =
444                 shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
445         Optional<NormalizedNode<?, ?>> optional = snapshot.readNode(carPath);
446         assertTrue("Car node present", optional.isPresent());
447         assertEquals("Car node", carNode, optional.get());
448     }
449
450     @SuppressWarnings("unchecked")
451     @Test
452     public void testAbortWithFailedRebase() {
453         immediatePayloadReplication(shardDataTree, mockShard);
454
455         final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
456             snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
457
458         final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
459             snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
460
461         NormalizedNode<?, ?> peopleNode = PeopleModel.create();
462         final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot ->
463             snapshot.write(PeopleModel.BASE_PATH, peopleNode));
464
465         immediateCanCommit(cohort1);
466         FutureCallback<Void> canCommitCallback2 = coordinatedCanCommit(cohort2);
467
468         coordinatedPreCommit(cohort1);
469         verify(canCommitCallback2).onSuccess(null);
470
471         FutureCallback<Void> mockAbortCallback = mock(FutureCallback.class);
472         doNothing().when(mockAbortCallback).onSuccess(null);
473         cohort1.abort(mockAbortCallback);
474         verify(mockAbortCallback).onSuccess(null);
475
476         FutureCallback<DataTreeCandidate> preCommitCallback2 = coordinatedPreCommit(cohort2);
477         verify(preCommitCallback2).onFailure(any(Throwable.class));
478
479         immediateCanCommit(cohort3);
480         immediatePreCommit(cohort3);
481         immediateCommit(cohort3);
482
483         final DataTreeSnapshot snapshot =
484                 shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
485         Optional<NormalizedNode<?, ?>> optional = snapshot.readNode(PeopleModel.BASE_PATH);
486         assertTrue("People node present", optional.isPresent());
487         assertEquals("People node", peopleNode, optional.get());
488     }
489
490     @Test
491     public void testUintCommitPayload() throws IOException {
492         shardDataTree.applyRecoveryPayload(CommitTransactionPayload.create(nextTransactionId(),
493             DataTreeCandidates.fromNormalizedNode(YangInstanceIdentifier.empty(), bigIntegerRoot()),
494             PayloadVersion.SODIUM_SR1));
495
496         assertCarsUint64();
497     }
498
499     @Test
500     public void testUintSnapshot() throws IOException, DataValidationFailedException {
501         shardDataTree.applyRecoverySnapshot(new ShardSnapshotState(new MetadataShardDataTreeSnapshot(bigIntegerRoot()),
502             true));
503
504         assertCarsUint64();
505     }
506
507     private void assertCarsUint64() {
508         final DataTreeSnapshot snapshot = shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
509         final NormalizedNode<?, ?> cars = snapshot.readNode(CarsModel.CAR_LIST_PATH).get();
510
511         assertEquals(Builders.mapBuilder()
512             .withNodeIdentifier(new NodeIdentifier(CarsModel.CAR_QNAME))
513             .withChild(Builders.mapEntryBuilder()
514                 .withNodeIdentifier(NodeIdentifierWithPredicates.of(CarsModel.CAR_QNAME,
515                     CarsModel.CAR_NAME_QNAME, "foo"))
516                 .withChild(ImmutableNodes.leafNode(CarsModel.CAR_NAME_QNAME, "foo"))
517                 // Note: Uint64
518                 .withChild(ImmutableNodes.leafNode(CarsModel.CAR_PRICE_QNAME, Uint64.ONE))
519                 .build())
520             .build(), cars);
521     }
522
523     private static ContainerNode bigIntegerRoot() {
524         return Builders.containerBuilder()
525                 .withNodeIdentifier(new NodeIdentifier(SchemaContext.NAME))
526                 .withChild(Builders.containerBuilder()
527                     .withNodeIdentifier(new NodeIdentifier(CarsModel.CARS_QNAME))
528                     .withChild(Builders.mapBuilder()
529                         .withNodeIdentifier(new NodeIdentifier(CarsModel.CAR_QNAME))
530                         .withChild(Builders.mapEntryBuilder()
531                             .withNodeIdentifier(NodeIdentifierWithPredicates.of(CarsModel.CAR_QNAME,
532                                 CarsModel.CAR_NAME_QNAME, "foo"))
533                             .withChild(ImmutableNodes.leafNode(CarsModel.CAR_NAME_QNAME, "foo"))
534                             // Note: old BigInteger
535                             .withChild(ImmutableNodes.leafNode(CarsModel.CAR_PRICE_QNAME, BigInteger.ONE))
536                             .build())
537                         .build())
538                     .build())
539                 .build();
540     }
541
542     private ShardDataTreeCohort newShardDataTreeCohort(final DataTreeOperation operation) {
543         final ReadWriteShardDataTreeTransaction transaction =
544                 shardDataTree.newReadWriteTransaction(nextTransactionId());
545         final DataTreeModification snapshot = transaction.getSnapshot();
546         operation.execute(snapshot);
547         return shardDataTree.finishTransaction(transaction, Optional.empty());
548     }
549
550     @SuppressWarnings({ "rawtypes", "unchecked" })
551     private static void verifyOnDataTreeChanged(final DOMDataTreeChangeListener listener,
552             final Consumer<DataTreeCandidate> callback) {
553         ArgumentCaptor<Collection> changes = ArgumentCaptor.forClass(Collection.class);
554         verify(listener, atLeastOnce()).onDataTreeChanged(changes.capture());
555         for (Collection list : changes.getAllValues()) {
556             for (Object dtc : list) {
557                 callback.accept((DataTreeCandidate)dtc);
558             }
559         }
560
561         reset(listener);
562     }
563
564     private static NormalizedNode<?, ?> getCars(final ShardDataTree shardDataTree) {
565         final ReadOnlyShardDataTreeTransaction readOnlyShardDataTreeTransaction =
566                 shardDataTree.newReadOnlyTransaction(nextTransactionId());
567         final DataTreeSnapshot snapshot1 = readOnlyShardDataTreeTransaction.getSnapshot();
568
569         final Optional<NormalizedNode<?, ?>> optional = snapshot1.readNode(CarsModel.BASE_PATH);
570
571         assertTrue(optional.isPresent());
572
573         return optional.get();
574     }
575
576     private static DataTreeCandidate addCar(final ShardDataTree shardDataTree) {
577         return addCar(shardDataTree, "altima");
578     }
579
580     private static DataTreeCandidate addCar(final ShardDataTree shardDataTree, final String name) {
581         return doTransaction(shardDataTree, snapshot -> {
582             snapshot.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer());
583             snapshot.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
584             snapshot.write(CarsModel.newCarPath(name), CarsModel.newCarEntry(name, Uint64.valueOf(100)));
585         });
586     }
587
588     private static DataTreeCandidate removeCar(final ShardDataTree shardDataTree) {
589         return doTransaction(shardDataTree, snapshot -> snapshot.delete(CarsModel.newCarPath("altima")));
590     }
591
592     @FunctionalInterface
593     private interface DataTreeOperation {
594         void execute(DataTreeModification snapshot);
595     }
596
597     private static DataTreeCandidate doTransaction(final ShardDataTree shardDataTree,
598             final DataTreeOperation operation) {
599         final ReadWriteShardDataTreeTransaction transaction =
600                 shardDataTree.newReadWriteTransaction(nextTransactionId());
601         final DataTreeModification snapshot = transaction.getSnapshot();
602         operation.execute(snapshot);
603         final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction, Optional.empty());
604
605         immediateCanCommit(cohort);
606         immediatePreCommit(cohort);
607         final DataTreeCandidate candidate = cohort.getCandidate();
608         immediateCommit(cohort);
609
610         return candidate;
611     }
612
613     private static DataTreeCandidate applyCandidates(final ShardDataTree shardDataTree,
614             final List<DataTreeCandidate> candidates) {
615         final ReadWriteShardDataTreeTransaction transaction =
616                 shardDataTree.newReadWriteTransaction(nextTransactionId());
617         final DataTreeModification snapshot = transaction.getSnapshot();
618         for (final DataTreeCandidate candidateTip : candidates) {
619             DataTreeCandidates.applyToModification(snapshot, candidateTip);
620         }
621         final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction, Optional.empty());
622
623         immediateCanCommit(cohort);
624         immediatePreCommit(cohort);
625         final DataTreeCandidate candidate = cohort.getCandidate();
626         immediateCommit(cohort);
627
628         return candidate;
629     }
630 }