Migrate users of Optional.get()
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardDataTreeTest.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertTrue;
13 import static org.junit.Assert.fail;
14 import static org.mockito.ArgumentMatchers.any;
15 import static org.mockito.ArgumentMatchers.anyBoolean;
16 import static org.mockito.ArgumentMatchers.eq;
17 import static org.mockito.Mockito.atLeastOnce;
18 import static org.mockito.Mockito.doNothing;
19 import static org.mockito.Mockito.doReturn;
20 import static org.mockito.Mockito.inOrder;
21 import static org.mockito.Mockito.mock;
22 import static org.mockito.Mockito.never;
23 import static org.mockito.Mockito.reset;
24 import static org.mockito.Mockito.verify;
25 import static org.mockito.Mockito.verifyNoMoreInteractions;
26 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.coordinatedCanCommit;
27 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.coordinatedCommit;
28 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.coordinatedPreCommit;
29 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediate3PhaseCommit;
30 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCanCommit;
31 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCommit;
32 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediatePayloadReplication;
33 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediatePreCommit;
34
35 import com.google.common.base.Ticker;
36 import com.google.common.primitives.UnsignedLong;
37 import com.google.common.util.concurrent.FutureCallback;
38 import java.io.IOException;
39 import java.math.BigInteger;
40 import java.util.ArrayList;
41 import java.util.Collection;
42 import java.util.HashMap;
43 import java.util.List;
44 import java.util.Map;
45 import java.util.Optional;
46 import java.util.function.Consumer;
47 import org.junit.Before;
48 import org.junit.Test;
49 import org.mockito.ArgumentCaptor;
50 import org.mockito.InOrder;
51 import org.mockito.Mockito;
52 import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
53 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
54 import org.opendaylight.controller.cluster.datastore.persisted.PayloadVersion;
55 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
56 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
57 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
58 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
59 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
60 import org.opendaylight.yangtools.yang.common.Empty;
61 import org.opendaylight.yangtools.yang.common.Uint64;
62 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
63 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
64 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
65 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
66 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
67 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
68 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
69 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
70 import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
71 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
72 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeConfiguration;
73 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
74 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeSnapshot;
75 import org.opendaylight.yangtools.yang.data.tree.api.DataValidationFailedException;
76 import org.opendaylight.yangtools.yang.data.tree.api.ModificationType;
77 import org.opendaylight.yangtools.yang.data.tree.api.TreeType;
78 import org.opendaylight.yangtools.yang.data.tree.impl.di.InMemoryDataTreeFactory;
79 import org.opendaylight.yangtools.yang.data.tree.spi.DataTreeCandidates;
80 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
81 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
82
83 public class ShardDataTreeTest extends AbstractTest {
84     private static final DatastoreContext DATASTORE_CONTEXT = DatastoreContext.newBuilder().build();
85
86     private final Shard mockShard = Mockito.mock(Shard.class);
87     private ShardDataTree shardDataTree;
88     private EffectiveModelContext fullSchema;
89
90     @Before
91     public void setUp() {
92         doReturn(Ticker.systemTicker()).when(mockShard).ticker();
93         doReturn(new ShardStats("shardName", "mxBeanType", mockShard)).when(mockShard).getShardMBean();
94         doReturn(DATASTORE_CONTEXT).when(mockShard).getDatastoreContext();
95
96         fullSchema = SchemaContextHelper.full();
97
98         shardDataTree = new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL);
99     }
100
101     @Test
102     public void testWrite() {
103         modify(false, true, true);
104     }
105
106     @Test
107     public void testMerge() {
108         modify(true, true, true);
109     }
110
111     private void modify(final boolean merge, final boolean expectedCarsPresent, final boolean expectedPeoplePresent) {
112         immediatePayloadReplication(shardDataTree, mockShard);
113
114         assertEquals(fullSchema, shardDataTree.getSchemaContext());
115
116         final ReadWriteShardDataTreeTransaction transaction =
117                 shardDataTree.newReadWriteTransaction(nextTransactionId());
118
119         final DataTreeModification snapshot = transaction.getSnapshot();
120
121         assertNotNull(snapshot);
122
123         if (merge) {
124             snapshot.merge(CarsModel.BASE_PATH, CarsModel.create());
125             snapshot.merge(PeopleModel.BASE_PATH, PeopleModel.create());
126         } else {
127             snapshot.write(CarsModel.BASE_PATH, CarsModel.create());
128             snapshot.write(PeopleModel.BASE_PATH, PeopleModel.create());
129         }
130
131         final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction, Optional.empty());
132
133         immediateCanCommit(cohort);
134         immediatePreCommit(cohort);
135         immediateCommit(cohort);
136
137         final ReadOnlyShardDataTreeTransaction readOnlyShardDataTreeTransaction =
138                 shardDataTree.newReadOnlyTransaction(nextTransactionId());
139
140         final DataTreeSnapshot snapshot1 = readOnlyShardDataTreeTransaction.getSnapshot();
141
142         final Optional<NormalizedNode> optional = snapshot1.readNode(CarsModel.BASE_PATH);
143
144         assertEquals(expectedCarsPresent, optional.isPresent());
145
146         final Optional<NormalizedNode> optional1 = snapshot1.readNode(PeopleModel.BASE_PATH);
147
148         assertEquals(expectedPeoplePresent, optional1.isPresent());
149     }
150
151     @Test
152     public void bug4359AddRemoveCarOnce() {
153         immediatePayloadReplication(shardDataTree, mockShard);
154
155         final List<DataTreeCandidate> candidates = new ArrayList<>();
156         candidates.add(addCar(shardDataTree));
157         candidates.add(removeCar(shardDataTree));
158
159         final NormalizedNode expected = getCars(shardDataTree);
160
161         applyCandidates(shardDataTree, candidates);
162
163         final NormalizedNode actual = getCars(shardDataTree);
164
165         assertEquals(expected, actual);
166     }
167
168     @Test
169     public void bug4359AddRemoveCarTwice() {
170         immediatePayloadReplication(shardDataTree, mockShard);
171
172         final List<DataTreeCandidate> candidates = new ArrayList<>();
173         candidates.add(addCar(shardDataTree));
174         candidates.add(removeCar(shardDataTree));
175         candidates.add(addCar(shardDataTree));
176         candidates.add(removeCar(shardDataTree));
177
178         final NormalizedNode expected = getCars(shardDataTree);
179
180         applyCandidates(shardDataTree, candidates);
181
182         final NormalizedNode actual = getCars(shardDataTree);
183
184         assertEquals(expected, actual);
185     }
186
187     @Test
188     public void testListenerNotifiedOnApplySnapshot() throws Exception {
189         immediatePayloadReplication(shardDataTree, mockShard);
190
191         DOMDataTreeChangeListener listener = mock(DOMDataTreeChangeListener.class);
192         shardDataTree.registerTreeChangeListener(CarsModel.CAR_LIST_PATH.node(CarsModel.CAR_QNAME), listener,
193             Optional.empty(), noop -> { });
194
195         addCar(shardDataTree, "optima");
196
197         verifyOnDataTreeChanged(listener, dtc -> {
198             assertEquals("getModificationType", ModificationType.WRITE, dtc.getRootNode().getModificationType());
199             assertEquals("getRootPath", CarsModel.newCarPath("optima"), dtc.getRootPath());
200         });
201
202         addCar(shardDataTree, "sportage");
203
204         verifyOnDataTreeChanged(listener, dtc -> {
205             assertEquals("getModificationType", ModificationType.WRITE, dtc.getRootNode().getModificationType());
206             assertEquals("getRootPath", CarsModel.newCarPath("sportage"), dtc.getRootPath());
207         });
208
209         ShardDataTree newDataTree = new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL);
210         immediatePayloadReplication(newDataTree, mockShard);
211         addCar(newDataTree, "optima");
212         addCar(newDataTree, "murano");
213
214         shardDataTree.applySnapshot(newDataTree.takeStateSnapshot());
215
216         Map<YangInstanceIdentifier, ModificationType> expChanges = new HashMap<>();
217         expChanges.put(CarsModel.newCarPath("optima"), ModificationType.WRITE);
218         expChanges.put(CarsModel.newCarPath("murano"), ModificationType.WRITE);
219         expChanges.put(CarsModel.newCarPath("sportage"), ModificationType.DELETE);
220         verifyOnDataTreeChanged(listener, dtc -> {
221             ModificationType expType = expChanges.remove(dtc.getRootPath());
222             assertNotNull("Got unexpected change for " + dtc.getRootPath(), expType);
223             assertEquals("getModificationType", expType, dtc.getRootNode().getModificationType());
224         });
225
226         if (!expChanges.isEmpty()) {
227             fail("Missing change notifications: " + expChanges);
228         }
229     }
230
231     @Test
232     public void testPipelinedTransactionsWithCoordinatedCommits() throws Exception {
233         final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
234             snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
235
236         final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
237             snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
238
239         NormalizedNode peopleNode = PeopleModel.create();
240         final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot ->
241             snapshot.write(PeopleModel.BASE_PATH, peopleNode));
242
243         YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
244         MapEntryNode carNode = CarsModel.newCarEntry("optima", Uint64.valueOf(100));
245         final ShardDataTreeCohort cohort4 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode));
246
247         immediateCanCommit(cohort1);
248         final FutureCallback<Empty> canCommitCallback2 = coordinatedCanCommit(cohort2);
249         final FutureCallback<Empty> canCommitCallback3 = coordinatedCanCommit(cohort3);
250         final FutureCallback<Empty> canCommitCallback4 = coordinatedCanCommit(cohort4);
251
252         final FutureCallback<DataTreeCandidate> preCommitCallback1 = coordinatedPreCommit(cohort1);
253         verify(preCommitCallback1).onSuccess(cohort1.getCandidate());
254         verify(canCommitCallback2).onSuccess(Empty.value());
255
256         final FutureCallback<DataTreeCandidate> preCommitCallback2 = coordinatedPreCommit(cohort2);
257         verify(preCommitCallback2).onSuccess(cohort2.getCandidate());
258         verify(canCommitCallback3).onSuccess(Empty.value());
259
260         final FutureCallback<DataTreeCandidate> preCommitCallback3 = coordinatedPreCommit(cohort3);
261         verify(preCommitCallback3).onSuccess(cohort3.getCandidate());
262         verify(canCommitCallback4).onSuccess(Empty.value());
263
264         final FutureCallback<DataTreeCandidate> preCommitCallback4 = coordinatedPreCommit(cohort4);
265         verify(preCommitCallback4).onSuccess(cohort4.getCandidate());
266
267         final FutureCallback<UnsignedLong> commitCallback2 = coordinatedCommit(cohort2);
268         verify(mockShard, never()).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class),
269                 anyBoolean());
270         verifyNoMoreInteractions(commitCallback2);
271
272         final FutureCallback<UnsignedLong> commitCallback4 = coordinatedCommit(cohort4);
273         verify(mockShard, never()).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class),
274                 anyBoolean());
275         verifyNoMoreInteractions(commitCallback4);
276
277         final FutureCallback<UnsignedLong> commitCallback1 = coordinatedCommit(cohort1);
278         InOrder inOrder = inOrder(mockShard);
279         inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class),
280                 eq(true));
281         inOrder.verify(mockShard).persistPayload(eq(cohort2.getIdentifier()), any(CommitTransactionPayload.class),
282                 eq(false));
283         verifyNoMoreInteractions(commitCallback1);
284         verifyNoMoreInteractions(commitCallback2);
285
286         final FutureCallback<UnsignedLong> commitCallback3 = coordinatedCommit(cohort3);
287         inOrder = inOrder(mockShard);
288         inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class),
289                 eq(true));
290         inOrder.verify(mockShard).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class),
291                 eq(false));
292         verifyNoMoreInteractions(commitCallback3);
293         verifyNoMoreInteractions(commitCallback4);
294
295         final ShardDataTreeCohort cohort5 = newShardDataTreeCohort(snapshot ->
296             snapshot.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
297         final FutureCallback<Empty> canCommitCallback5 = coordinatedCanCommit(cohort5);
298
299         // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
300         CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(),
301                 cohort1.getCandidate());
302         shardDataTree.applyReplicatedPayload(cohort1.getIdentifier(), mockPayload);
303         shardDataTree.applyReplicatedPayload(cohort2.getIdentifier(), mockPayload);
304         shardDataTree.applyReplicatedPayload(cohort3.getIdentifier(), mockPayload);
305         shardDataTree.applyReplicatedPayload(cohort4.getIdentifier(), mockPayload);
306
307         inOrder = inOrder(commitCallback1, commitCallback2, commitCallback3, commitCallback4);
308         inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class));
309         inOrder.verify(commitCallback2).onSuccess(any(UnsignedLong.class));
310         inOrder.verify(commitCallback3).onSuccess(any(UnsignedLong.class));
311         inOrder.verify(commitCallback4).onSuccess(any(UnsignedLong.class));
312
313         verify(canCommitCallback5).onSuccess(Empty.value());
314
315         final DataTreeSnapshot snapshot =
316                 shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
317         assertEquals("Car node", Optional.of(carNode), snapshot.readNode(carPath));
318         assertEquals("People node", Optional.of(peopleNode), snapshot.readNode(PeopleModel.BASE_PATH));
319     }
320
321     @Test
322     public void testPipelinedTransactionsWithImmediateCommits() throws Exception {
323         final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
324             snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
325
326         final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
327             snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
328
329         YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
330         MapEntryNode carNode = CarsModel.newCarEntry("optima", Uint64.valueOf(100));
331         final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode));
332
333         final FutureCallback<UnsignedLong> commitCallback2 = immediate3PhaseCommit(cohort2);
334         final FutureCallback<UnsignedLong> commitCallback3 = immediate3PhaseCommit(cohort3);
335         final FutureCallback<UnsignedLong> commitCallback1 = immediate3PhaseCommit(cohort1);
336
337         InOrder inOrder = inOrder(mockShard);
338         inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class),
339                 eq(true));
340         inOrder.verify(mockShard).persistPayload(eq(cohort2.getIdentifier()), any(CommitTransactionPayload.class),
341                 eq(true));
342         inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class),
343                 eq(false));
344
345         // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
346         CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(),
347                 cohort1.getCandidate());
348         shardDataTree.applyReplicatedPayload(cohort1.getIdentifier(), mockPayload);
349         shardDataTree.applyReplicatedPayload(cohort2.getIdentifier(), mockPayload);
350         shardDataTree.applyReplicatedPayload(cohort3.getIdentifier(), mockPayload);
351
352         inOrder = inOrder(commitCallback1, commitCallback2, commitCallback3);
353         inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class));
354         inOrder.verify(commitCallback2).onSuccess(any(UnsignedLong.class));
355         inOrder.verify(commitCallback3).onSuccess(any(UnsignedLong.class));
356
357         final DataTreeSnapshot snapshot =
358                 shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
359         assertEquals("Car node", Optional.of(carNode), snapshot.readNode(carPath));
360     }
361
362     @Test
363     public void testPipelinedTransactionsWithImmediateReplication() {
364         immediatePayloadReplication(shardDataTree, mockShard);
365
366         final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
367             snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
368
369         final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
370             snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
371
372         YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
373         MapEntryNode carNode = CarsModel.newCarEntry("optima", Uint64.valueOf(100));
374         final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode));
375
376         final FutureCallback<UnsignedLong> commitCallback1 = immediate3PhaseCommit(cohort1);
377         final FutureCallback<UnsignedLong> commitCallback2 = immediate3PhaseCommit(cohort2);
378         final FutureCallback<UnsignedLong> commitCallback3 = immediate3PhaseCommit(cohort3);
379
380         InOrder inOrder = inOrder(commitCallback1, commitCallback2, commitCallback3);
381         inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class));
382         inOrder.verify(commitCallback2).onSuccess(any(UnsignedLong.class));
383         inOrder.verify(commitCallback3).onSuccess(any(UnsignedLong.class));
384
385         final DataTreeSnapshot snapshot = shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
386         Optional<NormalizedNode> optional = snapshot.readNode(CarsModel.BASE_PATH);
387         assertTrue("Car node present", optional.isPresent());
388     }
389
390     @SuppressWarnings("unchecked")
391     @Test
392     public void testAbortWithPendingCommits() throws Exception {
393         final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
394             snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
395
396         final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
397             snapshot.write(PeopleModel.BASE_PATH, PeopleModel.create()));
398
399         final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot ->
400             snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
401
402         YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
403         MapEntryNode carNode = CarsModel.newCarEntry("optima", Uint64.valueOf(100));
404         final ShardDataTreeCohort cohort4 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode));
405
406         coordinatedCanCommit(cohort2);
407         immediateCanCommit(cohort1);
408         coordinatedCanCommit(cohort3);
409         coordinatedCanCommit(cohort4);
410
411         coordinatedPreCommit(cohort1);
412         coordinatedPreCommit(cohort2);
413         coordinatedPreCommit(cohort3);
414
415         FutureCallback<Empty> mockAbortCallback = mock(FutureCallback.class);
416         doNothing().when(mockAbortCallback).onSuccess(Empty.value());
417         cohort2.abort(mockAbortCallback);
418         verify(mockAbortCallback).onSuccess(Empty.value());
419
420         coordinatedPreCommit(cohort4);
421         coordinatedCommit(cohort1);
422         coordinatedCommit(cohort3);
423         coordinatedCommit(cohort4);
424
425         InOrder inOrder = inOrder(mockShard);
426         inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class),
427                 eq(false));
428         inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class),
429                 eq(false));
430         inOrder.verify(mockShard).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class),
431                 eq(false));
432
433         // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
434         CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(),
435                 cohort1.getCandidate());
436         shardDataTree.applyReplicatedPayload(cohort1.getIdentifier(), mockPayload);
437         shardDataTree.applyReplicatedPayload(cohort3.getIdentifier(), mockPayload);
438         shardDataTree.applyReplicatedPayload(cohort4.getIdentifier(), mockPayload);
439
440         final DataTreeSnapshot snapshot =
441                 shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
442         Optional<NormalizedNode> optional = snapshot.readNode(carPath);
443         assertEquals("Car node", Optional.of(carNode), optional);
444     }
445
446     @SuppressWarnings("unchecked")
447     @Test
448     public void testAbortWithFailedRebase() {
449         immediatePayloadReplication(shardDataTree, mockShard);
450
451         final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
452             snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
453
454         final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
455             snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
456
457         NormalizedNode peopleNode = PeopleModel.create();
458         final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot ->
459             snapshot.write(PeopleModel.BASE_PATH, peopleNode));
460
461         immediateCanCommit(cohort1);
462         FutureCallback<Empty> canCommitCallback2 = coordinatedCanCommit(cohort2);
463
464         coordinatedPreCommit(cohort1);
465         verify(canCommitCallback2).onSuccess(Empty.value());
466
467         FutureCallback<Empty> mockAbortCallback = mock(FutureCallback.class);
468         doNothing().when(mockAbortCallback).onSuccess(Empty.value());
469         cohort1.abort(mockAbortCallback);
470         verify(mockAbortCallback).onSuccess(Empty.value());
471
472         FutureCallback<DataTreeCandidate> preCommitCallback2 = coordinatedPreCommit(cohort2);
473         verify(preCommitCallback2).onFailure(any(Throwable.class));
474
475         immediateCanCommit(cohort3);
476         immediatePreCommit(cohort3);
477         immediateCommit(cohort3);
478
479         final DataTreeSnapshot snapshot =
480                 shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
481         Optional<NormalizedNode> optional = snapshot.readNode(PeopleModel.BASE_PATH);
482         assertEquals("People node", Optional.of(peopleNode), optional);
483     }
484
485     @Test
486     public void testUintCommitPayload() throws IOException {
487         shardDataTree.applyRecoveryPayload(CommitTransactionPayload.create(nextTransactionId(),
488             DataTreeCandidates.fromNormalizedNode(YangInstanceIdentifier.empty(), bigIntegerRoot()),
489             PayloadVersion.SODIUM_SR1));
490
491         assertCarsUint64();
492     }
493
494     @Test
495     public void testUintSnapshot() throws IOException, DataValidationFailedException {
496         shardDataTree.applyRecoverySnapshot(new ShardSnapshotState(new MetadataShardDataTreeSnapshot(bigIntegerRoot()),
497             true));
498
499         assertCarsUint64();
500     }
501
502     @Test
503     public void testUintReplay() throws DataValidationFailedException, IOException {
504         // Commit two writes and one merge, saving the data tree candidate for each.
505         //        write(foo=1)
506         //        write(foo=2)
507         //        merge(foo=3)
508         final DataTree dataTree = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
509             fullSchema);
510         DataTreeModification mod = dataTree.takeSnapshot().newModification();
511         mod.write(CarsModel.BASE_PATH, Builders.containerBuilder()
512                 .withNodeIdentifier(new NodeIdentifier(CarsModel.BASE_QNAME))
513                 .withChild(Builders.mapBuilder()
514                     .withNodeIdentifier(new NodeIdentifier(CarsModel.CAR_QNAME))
515                     .withChild(createCar("one", BigInteger.ONE))
516                     .build())
517                 .build());
518         mod.ready();
519         dataTree.validate(mod);
520         final DataTreeCandidate first = dataTree.prepare(mod);
521         dataTree.commit(first);
522
523         mod = dataTree.takeSnapshot().newModification();
524         mod.write(CarsModel.newCarPath("two"), createCar("two", BigInteger.TWO));
525         mod.ready();
526         dataTree.validate(mod);
527         final DataTreeCandidate second = dataTree.prepare(mod);
528         dataTree.commit(second);
529
530         mod = dataTree.takeSnapshot().newModification();
531         mod.merge(CarsModel.CAR_LIST_PATH, Builders.mapBuilder()
532             .withNodeIdentifier(new NodeIdentifier(CarsModel.CAR_QNAME))
533             .withChild(createCar("three", BigInteger.TEN))
534             .build());
535         mod.ready();
536         dataTree.validate(mod);
537         final DataTreeCandidate third = dataTree.prepare(mod);
538         dataTree.commit(third);
539
540         // Apply first candidate as a snapshot
541         shardDataTree.applyRecoverySnapshot(new ShardSnapshotState(
542             new MetadataShardDataTreeSnapshot(first.getRootNode().getDataAfter().orElseThrow()), true));
543         // Apply the other two snapshots as transactions
544         shardDataTree.applyRecoveryPayload(CommitTransactionPayload.create(nextTransactionId(), second,
545             PayloadVersion.SODIUM_SR1));
546         shardDataTree.applyRecoveryPayload(CommitTransactionPayload.create(nextTransactionId(), third,
547             PayloadVersion.SODIUM_SR1));
548
549         // Verify uint translation
550         final DataTreeSnapshot snapshot = shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
551
552         assertEquals(Builders.mapBuilder()
553             .withNodeIdentifier(new NodeIdentifier(CarsModel.CAR_QNAME))
554             // Note: Uint64
555             .withChild(createCar("one", Uint64.ONE))
556             .withChild(createCar("two", Uint64.TWO))
557             .withChild(createCar("three", Uint64.TEN))
558             .build(), snapshot.readNode(CarsModel.CAR_LIST_PATH).orElseThrow());
559     }
560
561     private void assertCarsUint64() {
562         final DataTreeSnapshot snapshot = shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
563         final NormalizedNode cars = snapshot.readNode(CarsModel.CAR_LIST_PATH).orElseThrow();
564
565         assertEquals(Builders.mapBuilder()
566             .withNodeIdentifier(new NodeIdentifier(CarsModel.CAR_QNAME))
567             // Note: Uint64
568             .withChild(createCar("foo", Uint64.ONE))
569             .build(), cars);
570     }
571
572     private static ContainerNode bigIntegerRoot() {
573         return Builders.containerBuilder()
574                 .withNodeIdentifier(new NodeIdentifier(SchemaContext.NAME))
575                 .withChild(Builders.containerBuilder()
576                     .withNodeIdentifier(new NodeIdentifier(CarsModel.CARS_QNAME))
577                     .withChild(Builders.mapBuilder()
578                         .withNodeIdentifier(new NodeIdentifier(CarsModel.CAR_QNAME))
579                         // Note: BigInteger
580                         .withChild(createCar("foo", BigInteger.ONE))
581                         .build())
582                     .build())
583                 .build();
584     }
585
586     private static MapEntryNode createCar(final String name, final Object value) {
587         return Builders.mapEntryBuilder()
588                 .withNodeIdentifier(NodeIdentifierWithPredicates.of(CarsModel.CAR_QNAME,CarsModel.CAR_NAME_QNAME, name))
589                 .withChild(ImmutableNodes.leafNode(CarsModel.CAR_NAME_QNAME, name))
590                 // Note: old BigInteger
591                 .withChild(ImmutableNodes.leafNode(CarsModel.CAR_PRICE_QNAME, value))
592                 .build();
593     }
594
595     private ShardDataTreeCohort newShardDataTreeCohort(final DataTreeOperation operation) {
596         final ReadWriteShardDataTreeTransaction transaction =
597                 shardDataTree.newReadWriteTransaction(nextTransactionId());
598         final DataTreeModification snapshot = transaction.getSnapshot();
599         operation.execute(snapshot);
600         return shardDataTree.finishTransaction(transaction, Optional.empty());
601     }
602
603     @SuppressWarnings({ "rawtypes", "unchecked" })
604     private static void verifyOnDataTreeChanged(final DOMDataTreeChangeListener listener,
605             final Consumer<DataTreeCandidate> callback) {
606         ArgumentCaptor<List> changes = ArgumentCaptor.forClass(List.class);
607         verify(listener, atLeastOnce()).onDataTreeChanged(changes.capture());
608         for (Collection list : changes.getAllValues()) {
609             for (Object dtc : list) {
610                 callback.accept((DataTreeCandidate)dtc);
611             }
612         }
613
614         reset(listener);
615     }
616
617     private static NormalizedNode getCars(final ShardDataTree shardDataTree) {
618         final ReadOnlyShardDataTreeTransaction readOnlyShardDataTreeTransaction =
619                 shardDataTree.newReadOnlyTransaction(nextTransactionId());
620         final DataTreeSnapshot snapshot1 = readOnlyShardDataTreeTransaction.getSnapshot();
621
622         final Optional<NormalizedNode> optional = snapshot1.readNode(CarsModel.BASE_PATH);
623
624         assertTrue(optional.isPresent());
625
626         return optional.orElseThrow();
627     }
628
629     private static DataTreeCandidate addCar(final ShardDataTree shardDataTree) {
630         return addCar(shardDataTree, "altima");
631     }
632
633     private static DataTreeCandidate addCar(final ShardDataTree shardDataTree, final String name) {
634         return doTransaction(shardDataTree, snapshot -> {
635             snapshot.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer());
636             snapshot.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
637             snapshot.write(CarsModel.newCarPath(name), CarsModel.newCarEntry(name, Uint64.valueOf(100)));
638         });
639     }
640
641     private static DataTreeCandidate removeCar(final ShardDataTree shardDataTree) {
642         return doTransaction(shardDataTree, snapshot -> snapshot.delete(CarsModel.newCarPath("altima")));
643     }
644
645     @FunctionalInterface
646     private interface DataTreeOperation {
647         void execute(DataTreeModification snapshot);
648     }
649
650     private static DataTreeCandidate doTransaction(final ShardDataTree shardDataTree,
651             final DataTreeOperation operation) {
652         final ReadWriteShardDataTreeTransaction transaction =
653                 shardDataTree.newReadWriteTransaction(nextTransactionId());
654         final DataTreeModification snapshot = transaction.getSnapshot();
655         operation.execute(snapshot);
656         final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction, Optional.empty());
657
658         immediateCanCommit(cohort);
659         immediatePreCommit(cohort);
660         final DataTreeCandidate candidate = cohort.getCandidate();
661         immediateCommit(cohort);
662
663         return candidate;
664     }
665
666     private static DataTreeCandidate applyCandidates(final ShardDataTree shardDataTree,
667             final List<DataTreeCandidate> candidates) {
668         final ReadWriteShardDataTreeTransaction transaction =
669                 shardDataTree.newReadWriteTransaction(nextTransactionId());
670         final DataTreeModification snapshot = transaction.getSnapshot();
671         for (final DataTreeCandidate candidateTip : candidates) {
672             DataTreeCandidates.applyToModification(snapshot, candidateTip);
673         }
674         final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction, Optional.empty());
675
676         immediateCanCommit(cohort);
677         immediatePreCommit(cohort);
678         final DataTreeCandidate candidate = cohort.getCandidate();
679         immediateCommit(cohort);
680
681         return candidate;
682     }
683 }