2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertTrue;
13 import static org.junit.Assert.fail;
14 import static org.mockito.ArgumentMatchers.any;
15 import static org.mockito.ArgumentMatchers.anyBoolean;
16 import static org.mockito.ArgumentMatchers.eq;
17 import static org.mockito.Mockito.atLeastOnce;
18 import static org.mockito.Mockito.doNothing;
19 import static org.mockito.Mockito.doReturn;
20 import static org.mockito.Mockito.inOrder;
21 import static org.mockito.Mockito.mock;
22 import static org.mockito.Mockito.never;
23 import static org.mockito.Mockito.reset;
24 import static org.mockito.Mockito.verify;
25 import static org.mockito.Mockito.verifyNoMoreInteractions;
26 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.coordinatedCanCommit;
27 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.coordinatedCommit;
28 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.coordinatedPreCommit;
29 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediate3PhaseCommit;
30 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCanCommit;
31 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCommit;
32 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediatePayloadReplication;
33 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediatePreCommit;
35 import com.google.common.base.Ticker;
36 import com.google.common.primitives.UnsignedLong;
37 import com.google.common.util.concurrent.FutureCallback;
38 import java.io.IOException;
39 import java.math.BigInteger;
40 import java.util.ArrayList;
41 import java.util.Collection;
42 import java.util.HashMap;
43 import java.util.List;
45 import java.util.Optional;
46 import java.util.function.Consumer;
47 import org.junit.Before;
48 import org.junit.Test;
49 import org.mockito.ArgumentCaptor;
50 import org.mockito.InOrder;
51 import org.mockito.Mockito;
52 import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
53 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
54 import org.opendaylight.controller.cluster.datastore.persisted.PayloadVersion;
55 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
56 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
57 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
58 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
59 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
60 import org.opendaylight.yangtools.yang.common.Uint64;
61 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
62 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
63 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
64 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
65 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
66 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
67 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
68 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
69 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
70 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
71 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
72 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
73 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
74 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
75 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
76 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
77 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
78 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
79 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
80 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
82 public class ShardDataTreeTest extends AbstractTest {
83 private static final DatastoreContext DATASTORE_CONTEXT = DatastoreContext.newBuilder().build();
85 private final Shard mockShard = Mockito.mock(Shard.class);
86 private ShardDataTree shardDataTree;
87 private EffectiveModelContext fullSchema;
91 doReturn(Ticker.systemTicker()).when(mockShard).ticker();
92 doReturn(new ShardStats("shardName", "mxBeanType", mockShard)).when(mockShard).getShardMBean();
93 doReturn(DATASTORE_CONTEXT).when(mockShard).getDatastoreContext();
95 fullSchema = SchemaContextHelper.full();
97 shardDataTree = new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL);
101 public void testWrite() {
102 modify(false, true, true);
106 public void testMerge() {
107 modify(true, true, true);
110 private void modify(final boolean merge, final boolean expectedCarsPresent, final boolean expectedPeoplePresent) {
111 immediatePayloadReplication(shardDataTree, mockShard);
113 assertEquals(fullSchema, shardDataTree.getSchemaContext());
115 final ReadWriteShardDataTreeTransaction transaction =
116 shardDataTree.newReadWriteTransaction(nextTransactionId());
118 final DataTreeModification snapshot = transaction.getSnapshot();
120 assertNotNull(snapshot);
123 snapshot.merge(CarsModel.BASE_PATH, CarsModel.create());
124 snapshot.merge(PeopleModel.BASE_PATH, PeopleModel.create());
126 snapshot.write(CarsModel.BASE_PATH, CarsModel.create());
127 snapshot.write(PeopleModel.BASE_PATH, PeopleModel.create());
130 final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction, Optional.empty());
132 immediateCanCommit(cohort);
133 immediatePreCommit(cohort);
134 immediateCommit(cohort);
136 final ReadOnlyShardDataTreeTransaction readOnlyShardDataTreeTransaction =
137 shardDataTree.newReadOnlyTransaction(nextTransactionId());
139 final DataTreeSnapshot snapshot1 = readOnlyShardDataTreeTransaction.getSnapshot();
141 final Optional<NormalizedNode> optional = snapshot1.readNode(CarsModel.BASE_PATH);
143 assertEquals(expectedCarsPresent, optional.isPresent());
145 final Optional<NormalizedNode> optional1 = snapshot1.readNode(PeopleModel.BASE_PATH);
147 assertEquals(expectedPeoplePresent, optional1.isPresent());
151 public void bug4359AddRemoveCarOnce() {
152 immediatePayloadReplication(shardDataTree, mockShard);
154 final List<DataTreeCandidate> candidates = new ArrayList<>();
155 candidates.add(addCar(shardDataTree));
156 candidates.add(removeCar(shardDataTree));
158 final NormalizedNode expected = getCars(shardDataTree);
160 applyCandidates(shardDataTree, candidates);
162 final NormalizedNode actual = getCars(shardDataTree);
164 assertEquals(expected, actual);
168 public void bug4359AddRemoveCarTwice() {
169 immediatePayloadReplication(shardDataTree, mockShard);
171 final List<DataTreeCandidate> candidates = new ArrayList<>();
172 candidates.add(addCar(shardDataTree));
173 candidates.add(removeCar(shardDataTree));
174 candidates.add(addCar(shardDataTree));
175 candidates.add(removeCar(shardDataTree));
177 final NormalizedNode expected = getCars(shardDataTree);
179 applyCandidates(shardDataTree, candidates);
181 final NormalizedNode actual = getCars(shardDataTree);
183 assertEquals(expected, actual);
187 public void testListenerNotifiedOnApplySnapshot() throws Exception {
188 immediatePayloadReplication(shardDataTree, mockShard);
190 DOMDataTreeChangeListener listener = mock(DOMDataTreeChangeListener.class);
191 shardDataTree.registerTreeChangeListener(CarsModel.CAR_LIST_PATH.node(CarsModel.CAR_QNAME), listener,
192 Optional.empty(), noop -> { });
194 addCar(shardDataTree, "optima");
196 verifyOnDataTreeChanged(listener, dtc -> {
197 assertEquals("getModificationType", ModificationType.WRITE, dtc.getRootNode().getModificationType());
198 assertEquals("getRootPath", CarsModel.newCarPath("optima"), dtc.getRootPath());
201 addCar(shardDataTree, "sportage");
203 verifyOnDataTreeChanged(listener, dtc -> {
204 assertEquals("getModificationType", ModificationType.WRITE, dtc.getRootNode().getModificationType());
205 assertEquals("getRootPath", CarsModel.newCarPath("sportage"), dtc.getRootPath());
208 ShardDataTree newDataTree = new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL);
209 immediatePayloadReplication(newDataTree, mockShard);
210 addCar(newDataTree, "optima");
211 addCar(newDataTree, "murano");
213 shardDataTree.applySnapshot(newDataTree.takeStateSnapshot());
215 Map<YangInstanceIdentifier, ModificationType> expChanges = new HashMap<>();
216 expChanges.put(CarsModel.newCarPath("optima"), ModificationType.WRITE);
217 expChanges.put(CarsModel.newCarPath("murano"), ModificationType.WRITE);
218 expChanges.put(CarsModel.newCarPath("sportage"), ModificationType.DELETE);
219 verifyOnDataTreeChanged(listener, dtc -> {
220 ModificationType expType = expChanges.remove(dtc.getRootPath());
221 assertNotNull("Got unexpected change for " + dtc.getRootPath(), expType);
222 assertEquals("getModificationType", expType, dtc.getRootNode().getModificationType());
225 if (!expChanges.isEmpty()) {
226 fail("Missing change notifications: " + expChanges);
231 public void testPipelinedTransactionsWithCoordinatedCommits() throws Exception {
232 final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
233 snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
235 final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
236 snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
238 NormalizedNode peopleNode = PeopleModel.create();
239 final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot ->
240 snapshot.write(PeopleModel.BASE_PATH, peopleNode));
242 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
243 MapEntryNode carNode = CarsModel.newCarEntry("optima", Uint64.valueOf(100));
244 final ShardDataTreeCohort cohort4 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode));
246 immediateCanCommit(cohort1);
247 final FutureCallback<Void> canCommitCallback2 = coordinatedCanCommit(cohort2);
248 final FutureCallback<Void> canCommitCallback3 = coordinatedCanCommit(cohort3);
249 final FutureCallback<Void> canCommitCallback4 = coordinatedCanCommit(cohort4);
251 final FutureCallback<DataTreeCandidate> preCommitCallback1 = coordinatedPreCommit(cohort1);
252 verify(preCommitCallback1).onSuccess(cohort1.getCandidate());
253 verify(canCommitCallback2).onSuccess(null);
255 final FutureCallback<DataTreeCandidate> preCommitCallback2 = coordinatedPreCommit(cohort2);
256 verify(preCommitCallback2).onSuccess(cohort2.getCandidate());
257 verify(canCommitCallback3).onSuccess(null);
259 final FutureCallback<DataTreeCandidate> preCommitCallback3 = coordinatedPreCommit(cohort3);
260 verify(preCommitCallback3).onSuccess(cohort3.getCandidate());
261 verify(canCommitCallback4).onSuccess(null);
263 final FutureCallback<DataTreeCandidate> preCommitCallback4 = coordinatedPreCommit(cohort4);
264 verify(preCommitCallback4).onSuccess(cohort4.getCandidate());
266 final FutureCallback<UnsignedLong> commitCallback2 = coordinatedCommit(cohort2);
267 verify(mockShard, never()).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class),
269 verifyNoMoreInteractions(commitCallback2);
271 final FutureCallback<UnsignedLong> commitCallback4 = coordinatedCommit(cohort4);
272 verify(mockShard, never()).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class),
274 verifyNoMoreInteractions(commitCallback4);
276 final FutureCallback<UnsignedLong> commitCallback1 = coordinatedCommit(cohort1);
277 InOrder inOrder = inOrder(mockShard);
278 inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class),
280 inOrder.verify(mockShard).persistPayload(eq(cohort2.getIdentifier()), any(CommitTransactionPayload.class),
282 verifyNoMoreInteractions(commitCallback1);
283 verifyNoMoreInteractions(commitCallback2);
285 final FutureCallback<UnsignedLong> commitCallback3 = coordinatedCommit(cohort3);
286 inOrder = inOrder(mockShard);
287 inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class),
289 inOrder.verify(mockShard).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class),
291 verifyNoMoreInteractions(commitCallback3);
292 verifyNoMoreInteractions(commitCallback4);
294 final ShardDataTreeCohort cohort5 = newShardDataTreeCohort(snapshot ->
295 snapshot.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
296 final FutureCallback<Void> canCommitCallback5 = coordinatedCanCommit(cohort5);
298 // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
299 CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(),
300 cohort1.getCandidate());
301 shardDataTree.applyReplicatedPayload(cohort1.getIdentifier(), mockPayload);
302 shardDataTree.applyReplicatedPayload(cohort2.getIdentifier(), mockPayload);
303 shardDataTree.applyReplicatedPayload(cohort3.getIdentifier(), mockPayload);
304 shardDataTree.applyReplicatedPayload(cohort4.getIdentifier(), mockPayload);
306 inOrder = inOrder(commitCallback1, commitCallback2, commitCallback3, commitCallback4);
307 inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class));
308 inOrder.verify(commitCallback2).onSuccess(any(UnsignedLong.class));
309 inOrder.verify(commitCallback3).onSuccess(any(UnsignedLong.class));
310 inOrder.verify(commitCallback4).onSuccess(any(UnsignedLong.class));
312 verify(canCommitCallback5).onSuccess(null);
314 final DataTreeSnapshot snapshot =
315 shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
316 Optional<NormalizedNode> optional = snapshot.readNode(carPath);
317 assertTrue("Car node present", optional.isPresent());
318 assertEquals("Car node", carNode, optional.get());
320 optional = snapshot.readNode(PeopleModel.BASE_PATH);
321 assertTrue("People node present", optional.isPresent());
322 assertEquals("People node", peopleNode, optional.get());
326 public void testPipelinedTransactionsWithImmediateCommits() throws Exception {
327 final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
328 snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
330 final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
331 snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
333 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
334 MapEntryNode carNode = CarsModel.newCarEntry("optima", Uint64.valueOf(100));
335 final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode));
337 final FutureCallback<UnsignedLong> commitCallback2 = immediate3PhaseCommit(cohort2);
338 final FutureCallback<UnsignedLong> commitCallback3 = immediate3PhaseCommit(cohort3);
339 final FutureCallback<UnsignedLong> commitCallback1 = immediate3PhaseCommit(cohort1);
341 InOrder inOrder = inOrder(mockShard);
342 inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class),
344 inOrder.verify(mockShard).persistPayload(eq(cohort2.getIdentifier()), any(CommitTransactionPayload.class),
346 inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class),
349 // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
350 CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(),
351 cohort1.getCandidate());
352 shardDataTree.applyReplicatedPayload(cohort1.getIdentifier(), mockPayload);
353 shardDataTree.applyReplicatedPayload(cohort2.getIdentifier(), mockPayload);
354 shardDataTree.applyReplicatedPayload(cohort3.getIdentifier(), mockPayload);
356 inOrder = inOrder(commitCallback1, commitCallback2, commitCallback3);
357 inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class));
358 inOrder.verify(commitCallback2).onSuccess(any(UnsignedLong.class));
359 inOrder.verify(commitCallback3).onSuccess(any(UnsignedLong.class));
361 final DataTreeSnapshot snapshot =
362 shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
363 Optional<NormalizedNode> optional = snapshot.readNode(carPath);
364 assertTrue("Car node present", optional.isPresent());
365 assertEquals("Car node", carNode, optional.get());
369 public void testPipelinedTransactionsWithImmediateReplication() {
370 immediatePayloadReplication(shardDataTree, mockShard);
372 final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
373 snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
375 final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
376 snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
378 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
379 MapEntryNode carNode = CarsModel.newCarEntry("optima", Uint64.valueOf(100));
380 final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode));
382 final FutureCallback<UnsignedLong> commitCallback1 = immediate3PhaseCommit(cohort1);
383 final FutureCallback<UnsignedLong> commitCallback2 = immediate3PhaseCommit(cohort2);
384 final FutureCallback<UnsignedLong> commitCallback3 = immediate3PhaseCommit(cohort3);
386 InOrder inOrder = inOrder(commitCallback1, commitCallback2, commitCallback3);
387 inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class));
388 inOrder.verify(commitCallback2).onSuccess(any(UnsignedLong.class));
389 inOrder.verify(commitCallback3).onSuccess(any(UnsignedLong.class));
391 final DataTreeSnapshot snapshot = shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
392 Optional<NormalizedNode> optional = snapshot.readNode(CarsModel.BASE_PATH);
393 assertTrue("Car node present", optional.isPresent());
396 @SuppressWarnings("unchecked")
398 public void testAbortWithPendingCommits() throws Exception {
399 final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
400 snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
402 final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
403 snapshot.write(PeopleModel.BASE_PATH, PeopleModel.create()));
405 final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot ->
406 snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
408 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
409 MapEntryNode carNode = CarsModel.newCarEntry("optima", Uint64.valueOf(100));
410 final ShardDataTreeCohort cohort4 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode));
412 coordinatedCanCommit(cohort2);
413 immediateCanCommit(cohort1);
414 coordinatedCanCommit(cohort3);
415 coordinatedCanCommit(cohort4);
417 coordinatedPreCommit(cohort1);
418 coordinatedPreCommit(cohort2);
419 coordinatedPreCommit(cohort3);
421 FutureCallback<Void> mockAbortCallback = mock(FutureCallback.class);
422 doNothing().when(mockAbortCallback).onSuccess(null);
423 cohort2.abort(mockAbortCallback);
424 verify(mockAbortCallback).onSuccess(null);
426 coordinatedPreCommit(cohort4);
427 coordinatedCommit(cohort1);
428 coordinatedCommit(cohort3);
429 coordinatedCommit(cohort4);
431 InOrder inOrder = inOrder(mockShard);
432 inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class),
434 inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class),
436 inOrder.verify(mockShard).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class),
439 // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
440 CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(),
441 cohort1.getCandidate());
442 shardDataTree.applyReplicatedPayload(cohort1.getIdentifier(), mockPayload);
443 shardDataTree.applyReplicatedPayload(cohort3.getIdentifier(), mockPayload);
444 shardDataTree.applyReplicatedPayload(cohort4.getIdentifier(), mockPayload);
446 final DataTreeSnapshot snapshot =
447 shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
448 Optional<NormalizedNode> optional = snapshot.readNode(carPath);
449 assertTrue("Car node present", optional.isPresent());
450 assertEquals("Car node", carNode, optional.get());
453 @SuppressWarnings("unchecked")
455 public void testAbortWithFailedRebase() {
456 immediatePayloadReplication(shardDataTree, mockShard);
458 final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
459 snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
461 final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
462 snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
464 NormalizedNode peopleNode = PeopleModel.create();
465 final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot ->
466 snapshot.write(PeopleModel.BASE_PATH, peopleNode));
468 immediateCanCommit(cohort1);
469 FutureCallback<Void> canCommitCallback2 = coordinatedCanCommit(cohort2);
471 coordinatedPreCommit(cohort1);
472 verify(canCommitCallback2).onSuccess(null);
474 FutureCallback<Void> mockAbortCallback = mock(FutureCallback.class);
475 doNothing().when(mockAbortCallback).onSuccess(null);
476 cohort1.abort(mockAbortCallback);
477 verify(mockAbortCallback).onSuccess(null);
479 FutureCallback<DataTreeCandidate> preCommitCallback2 = coordinatedPreCommit(cohort2);
480 verify(preCommitCallback2).onFailure(any(Throwable.class));
482 immediateCanCommit(cohort3);
483 immediatePreCommit(cohort3);
484 immediateCommit(cohort3);
486 final DataTreeSnapshot snapshot =
487 shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
488 Optional<NormalizedNode> optional = snapshot.readNode(PeopleModel.BASE_PATH);
489 assertTrue("People node present", optional.isPresent());
490 assertEquals("People node", peopleNode, optional.get());
494 public void testUintCommitPayload() throws IOException {
495 shardDataTree.applyRecoveryPayload(CommitTransactionPayload.create(nextTransactionId(),
496 DataTreeCandidates.fromNormalizedNode(YangInstanceIdentifier.empty(), bigIntegerRoot()),
497 PayloadVersion.SODIUM_SR1));
503 public void testUintSnapshot() throws IOException, DataValidationFailedException {
504 shardDataTree.applyRecoverySnapshot(new ShardSnapshotState(new MetadataShardDataTreeSnapshot(bigIntegerRoot()),
511 public void testUintReplay() throws DataValidationFailedException, IOException {
512 // Commit two writes and one merge, saving the data tree candidate for each.
516 final DataTree dataTree = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
518 DataTreeModification mod = dataTree.takeSnapshot().newModification();
519 mod.write(CarsModel.BASE_PATH, Builders.containerBuilder()
520 .withNodeIdentifier(new NodeIdentifier(CarsModel.BASE_QNAME))
521 .withChild(Builders.mapBuilder()
522 .withNodeIdentifier(new NodeIdentifier(CarsModel.CAR_QNAME))
523 .withChild(createCar("one", BigInteger.ONE))
527 dataTree.validate(mod);
528 final DataTreeCandidate first = dataTree.prepare(mod);
529 dataTree.commit(first);
531 mod = dataTree.takeSnapshot().newModification();
532 mod.write(CarsModel.newCarPath("two"), createCar("two", BigInteger.TWO));
534 dataTree.validate(mod);
535 final DataTreeCandidate second = dataTree.prepare(mod);
536 dataTree.commit(second);
538 mod = dataTree.takeSnapshot().newModification();
539 mod.merge(CarsModel.CAR_LIST_PATH, Builders.mapBuilder()
540 .withNodeIdentifier(new NodeIdentifier(CarsModel.CAR_QNAME))
541 .withChild(createCar("three", BigInteger.TEN))
544 dataTree.validate(mod);
545 final DataTreeCandidate third = dataTree.prepare(mod);
546 dataTree.commit(third);
548 // Apply first candidate as a snapshot
549 shardDataTree.applyRecoverySnapshot(
550 new ShardSnapshotState(new MetadataShardDataTreeSnapshot(first.getRootNode().getDataAfter().get()), true));
551 // Apply the other two snapshots as transactions
552 shardDataTree.applyRecoveryPayload(CommitTransactionPayload.create(nextTransactionId(), second,
553 PayloadVersion.SODIUM_SR1));
554 shardDataTree.applyRecoveryPayload(CommitTransactionPayload.create(nextTransactionId(), third,
555 PayloadVersion.SODIUM_SR1));
557 // Verify uint translation
558 final DataTreeSnapshot snapshot = shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
559 final NormalizedNode cars = snapshot.readNode(CarsModel.CAR_LIST_PATH).get();
561 assertEquals(Builders.mapBuilder()
562 .withNodeIdentifier(new NodeIdentifier(CarsModel.CAR_QNAME))
564 .withChild(createCar("one", Uint64.ONE))
565 .withChild(createCar("two", Uint64.TWO))
566 .withChild(createCar("three", Uint64.TEN))
570 private void assertCarsUint64() {
571 final DataTreeSnapshot snapshot = shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
572 final NormalizedNode cars = snapshot.readNode(CarsModel.CAR_LIST_PATH).get();
574 assertEquals(Builders.mapBuilder()
575 .withNodeIdentifier(new NodeIdentifier(CarsModel.CAR_QNAME))
577 .withChild(createCar("foo", Uint64.ONE))
581 private static ContainerNode bigIntegerRoot() {
582 return Builders.containerBuilder()
583 .withNodeIdentifier(new NodeIdentifier(SchemaContext.NAME))
584 .withChild(Builders.containerBuilder()
585 .withNodeIdentifier(new NodeIdentifier(CarsModel.CARS_QNAME))
586 .withChild(Builders.mapBuilder()
587 .withNodeIdentifier(new NodeIdentifier(CarsModel.CAR_QNAME))
589 .withChild(createCar("foo", BigInteger.ONE))
595 private static MapEntryNode createCar(final String name, final Object value) {
596 return Builders.mapEntryBuilder()
597 .withNodeIdentifier(NodeIdentifierWithPredicates.of(CarsModel.CAR_QNAME,CarsModel.CAR_NAME_QNAME, name))
598 .withChild(ImmutableNodes.leafNode(CarsModel.CAR_NAME_QNAME, name))
599 // Note: old BigInteger
600 .withChild(ImmutableNodes.leafNode(CarsModel.CAR_PRICE_QNAME, value))
604 private ShardDataTreeCohort newShardDataTreeCohort(final DataTreeOperation operation) {
605 final ReadWriteShardDataTreeTransaction transaction =
606 shardDataTree.newReadWriteTransaction(nextTransactionId());
607 final DataTreeModification snapshot = transaction.getSnapshot();
608 operation.execute(snapshot);
609 return shardDataTree.finishTransaction(transaction, Optional.empty());
612 @SuppressWarnings({ "rawtypes", "unchecked" })
613 private static void verifyOnDataTreeChanged(final DOMDataTreeChangeListener listener,
614 final Consumer<DataTreeCandidate> callback) {
615 ArgumentCaptor<Collection> changes = ArgumentCaptor.forClass(Collection.class);
616 verify(listener, atLeastOnce()).onDataTreeChanged(changes.capture());
617 for (Collection list : changes.getAllValues()) {
618 for (Object dtc : list) {
619 callback.accept((DataTreeCandidate)dtc);
626 private static NormalizedNode getCars(final ShardDataTree shardDataTree) {
627 final ReadOnlyShardDataTreeTransaction readOnlyShardDataTreeTransaction =
628 shardDataTree.newReadOnlyTransaction(nextTransactionId());
629 final DataTreeSnapshot snapshot1 = readOnlyShardDataTreeTransaction.getSnapshot();
631 final Optional<NormalizedNode> optional = snapshot1.readNode(CarsModel.BASE_PATH);
633 assertTrue(optional.isPresent());
635 return optional.get();
638 private static DataTreeCandidate addCar(final ShardDataTree shardDataTree) {
639 return addCar(shardDataTree, "altima");
642 private static DataTreeCandidate addCar(final ShardDataTree shardDataTree, final String name) {
643 return doTransaction(shardDataTree, snapshot -> {
644 snapshot.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer());
645 snapshot.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
646 snapshot.write(CarsModel.newCarPath(name), CarsModel.newCarEntry(name, Uint64.valueOf(100)));
650 private static DataTreeCandidate removeCar(final ShardDataTree shardDataTree) {
651 return doTransaction(shardDataTree, snapshot -> snapshot.delete(CarsModel.newCarPath("altima")));
655 private interface DataTreeOperation {
656 void execute(DataTreeModification snapshot);
659 private static DataTreeCandidate doTransaction(final ShardDataTree shardDataTree,
660 final DataTreeOperation operation) {
661 final ReadWriteShardDataTreeTransaction transaction =
662 shardDataTree.newReadWriteTransaction(nextTransactionId());
663 final DataTreeModification snapshot = transaction.getSnapshot();
664 operation.execute(snapshot);
665 final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction, Optional.empty());
667 immediateCanCommit(cohort);
668 immediatePreCommit(cohort);
669 final DataTreeCandidate candidate = cohort.getCandidate();
670 immediateCommit(cohort);
675 private static DataTreeCandidate applyCandidates(final ShardDataTree shardDataTree,
676 final List<DataTreeCandidate> candidates) {
677 final ReadWriteShardDataTreeTransaction transaction =
678 shardDataTree.newReadWriteTransaction(nextTransactionId());
679 final DataTreeModification snapshot = transaction.getSnapshot();
680 for (final DataTreeCandidate candidateTip : candidates) {
681 DataTreeCandidates.applyToModification(snapshot, candidateTip);
683 final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction, Optional.empty());
685 immediateCanCommit(cohort);
686 immediatePreCommit(cohort);
687 final DataTreeCandidate candidate = cohort.getCandidate();
688 immediateCommit(cohort);