2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertTrue;
13 import static org.junit.Assert.fail;
14 import static org.mockito.ArgumentMatchers.any;
15 import static org.mockito.ArgumentMatchers.anyBoolean;
16 import static org.mockito.ArgumentMatchers.eq;
17 import static org.mockito.Mockito.atLeastOnce;
18 import static org.mockito.Mockito.doNothing;
19 import static org.mockito.Mockito.doReturn;
20 import static org.mockito.Mockito.inOrder;
21 import static org.mockito.Mockito.mock;
22 import static org.mockito.Mockito.never;
23 import static org.mockito.Mockito.reset;
24 import static org.mockito.Mockito.verify;
25 import static org.mockito.Mockito.verifyNoMoreInteractions;
26 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.coordinatedCanCommit;
27 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.coordinatedCommit;
28 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.coordinatedPreCommit;
29 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediate3PhaseCommit;
30 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCanCommit;
31 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCommit;
32 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediatePayloadReplication;
33 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediatePreCommit;
35 import com.google.common.base.Ticker;
36 import com.google.common.primitives.UnsignedLong;
37 import com.google.common.util.concurrent.FutureCallback;
38 import java.io.IOException;
39 import java.util.ArrayList;
40 import java.util.Collection;
41 import java.util.HashMap;
42 import java.util.List;
44 import java.util.Optional;
45 import java.util.function.Consumer;
46 import org.junit.Before;
47 import org.junit.Test;
48 import org.mockito.ArgumentCaptor;
49 import org.mockito.InOrder;
50 import org.mockito.Mockito;
51 import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
52 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
53 import org.opendaylight.controller.cluster.datastore.persisted.PayloadVersion;
54 import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
55 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
56 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
57 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
58 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
59 import org.opendaylight.yangtools.yang.common.Empty;
60 import org.opendaylight.yangtools.yang.common.Uint64;
61 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
62 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
63 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
64 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
65 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
66 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
67 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
68 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
69 import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
70 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
71 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeConfiguration;
72 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
73 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeSnapshot;
74 import org.opendaylight.yangtools.yang.data.tree.api.DataValidationFailedException;
75 import org.opendaylight.yangtools.yang.data.tree.api.ModificationType;
76 import org.opendaylight.yangtools.yang.data.tree.api.TreeType;
77 import org.opendaylight.yangtools.yang.data.tree.impl.di.InMemoryDataTreeFactory;
78 import org.opendaylight.yangtools.yang.data.tree.spi.DataTreeCandidates;
79 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
80 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
82 public class ShardDataTreeTest extends AbstractTest {
83 private static final DatastoreContext DATASTORE_CONTEXT = DatastoreContext.newBuilder().build();
85 private final Shard mockShard = Mockito.mock(Shard.class);
86 private ShardDataTree shardDataTree;
87 private EffectiveModelContext fullSchema;
91 doReturn(Ticker.systemTicker()).when(mockShard).ticker();
92 doReturn(new ShardStats("shardName", "mxBeanType", mockShard)).when(mockShard).getShardMBean();
93 doReturn(DATASTORE_CONTEXT).when(mockShard).getDatastoreContext();
95 fullSchema = SchemaContextHelper.full();
97 shardDataTree = new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL);
101 public void testWrite() {
102 modify(false, true, true);
106 public void testMerge() {
107 modify(true, true, true);
110 private void modify(final boolean merge, final boolean expectedCarsPresent, final boolean expectedPeoplePresent) {
111 immediatePayloadReplication(shardDataTree, mockShard);
113 assertEquals(fullSchema, shardDataTree.getSchemaContext());
115 final ReadWriteShardDataTreeTransaction transaction =
116 shardDataTree.newReadWriteTransaction(nextTransactionId());
118 final DataTreeModification snapshot = transaction.getSnapshot();
120 assertNotNull(snapshot);
123 snapshot.merge(CarsModel.BASE_PATH, CarsModel.create());
124 snapshot.merge(PeopleModel.BASE_PATH, PeopleModel.create());
126 snapshot.write(CarsModel.BASE_PATH, CarsModel.create());
127 snapshot.write(PeopleModel.BASE_PATH, PeopleModel.create());
130 final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction, Optional.empty());
132 immediateCanCommit(cohort);
133 immediatePreCommit(cohort);
134 immediateCommit(cohort);
136 final ReadOnlyShardDataTreeTransaction readOnlyShardDataTreeTransaction =
137 shardDataTree.newReadOnlyTransaction(nextTransactionId());
139 final DataTreeSnapshot snapshot1 = readOnlyShardDataTreeTransaction.getSnapshot();
141 final Optional<NormalizedNode> optional = snapshot1.readNode(CarsModel.BASE_PATH);
143 assertEquals(expectedCarsPresent, optional.isPresent());
145 final Optional<NormalizedNode> optional1 = snapshot1.readNode(PeopleModel.BASE_PATH);
147 assertEquals(expectedPeoplePresent, optional1.isPresent());
151 public void bug4359AddRemoveCarOnce() {
152 immediatePayloadReplication(shardDataTree, mockShard);
154 final List<DataTreeCandidate> candidates = new ArrayList<>();
155 candidates.add(addCar(shardDataTree));
156 candidates.add(removeCar(shardDataTree));
158 final NormalizedNode expected = getCars(shardDataTree);
160 applyCandidates(shardDataTree, candidates);
162 final NormalizedNode actual = getCars(shardDataTree);
164 assertEquals(expected, actual);
168 public void bug4359AddRemoveCarTwice() {
169 immediatePayloadReplication(shardDataTree, mockShard);
171 final List<DataTreeCandidate> candidates = new ArrayList<>();
172 candidates.add(addCar(shardDataTree));
173 candidates.add(removeCar(shardDataTree));
174 candidates.add(addCar(shardDataTree));
175 candidates.add(removeCar(shardDataTree));
177 final NormalizedNode expected = getCars(shardDataTree);
179 applyCandidates(shardDataTree, candidates);
181 final NormalizedNode actual = getCars(shardDataTree);
183 assertEquals(expected, actual);
187 public void testListenerNotifiedOnApplySnapshot() throws Exception {
188 immediatePayloadReplication(shardDataTree, mockShard);
190 DOMDataTreeChangeListener listener = mock(DOMDataTreeChangeListener.class);
191 shardDataTree.registerTreeChangeListener(CarsModel.CAR_LIST_PATH.node(CarsModel.CAR_QNAME), listener,
192 Optional.empty(), noop -> { });
194 addCar(shardDataTree, "optima");
196 verifyOnDataTreeChanged(listener, dtc -> {
197 assertEquals("getModificationType", ModificationType.WRITE, dtc.getRootNode().modificationType());
198 assertEquals("getRootPath", CarsModel.newCarPath("optima"), dtc.getRootPath());
201 addCar(shardDataTree, "sportage");
203 verifyOnDataTreeChanged(listener, dtc -> {
204 assertEquals("getModificationType", ModificationType.WRITE, dtc.getRootNode().modificationType());
205 assertEquals("getRootPath", CarsModel.newCarPath("sportage"), dtc.getRootPath());
208 ShardDataTree newDataTree = new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL);
209 immediatePayloadReplication(newDataTree, mockShard);
210 addCar(newDataTree, "optima");
211 addCar(newDataTree, "murano");
213 shardDataTree.applySnapshot(newDataTree.takeStateSnapshot());
215 Map<YangInstanceIdentifier, ModificationType> expChanges = new HashMap<>();
216 expChanges.put(CarsModel.newCarPath("optima"), ModificationType.WRITE);
217 expChanges.put(CarsModel.newCarPath("murano"), ModificationType.WRITE);
218 expChanges.put(CarsModel.newCarPath("sportage"), ModificationType.DELETE);
219 verifyOnDataTreeChanged(listener, dtc -> {
220 ModificationType expType = expChanges.remove(dtc.getRootPath());
221 assertNotNull("Got unexpected change for " + dtc.getRootPath(), expType);
222 assertEquals("getModificationType", expType, dtc.getRootNode().modificationType());
225 if (!expChanges.isEmpty()) {
226 fail("Missing change notifications: " + expChanges);
231 public void testPipelinedTransactionsWithCoordinatedCommits() throws Exception {
232 final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
233 snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
235 final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
236 snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
238 NormalizedNode peopleNode = PeopleModel.create();
239 final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot ->
240 snapshot.write(PeopleModel.BASE_PATH, peopleNode));
242 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
243 MapEntryNode carNode = CarsModel.newCarEntry("optima", Uint64.valueOf(100));
244 final ShardDataTreeCohort cohort4 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode));
246 immediateCanCommit(cohort1);
247 final FutureCallback<Empty> canCommitCallback2 = coordinatedCanCommit(cohort2);
248 final FutureCallback<Empty> canCommitCallback3 = coordinatedCanCommit(cohort3);
249 final FutureCallback<Empty> canCommitCallback4 = coordinatedCanCommit(cohort4);
251 final FutureCallback<DataTreeCandidate> preCommitCallback1 = coordinatedPreCommit(cohort1);
252 verify(preCommitCallback1).onSuccess(cohort1.getCandidate());
253 verify(canCommitCallback2).onSuccess(Empty.value());
255 final FutureCallback<DataTreeCandidate> preCommitCallback2 = coordinatedPreCommit(cohort2);
256 verify(preCommitCallback2).onSuccess(cohort2.getCandidate());
257 verify(canCommitCallback3).onSuccess(Empty.value());
259 final FutureCallback<DataTreeCandidate> preCommitCallback3 = coordinatedPreCommit(cohort3);
260 verify(preCommitCallback3).onSuccess(cohort3.getCandidate());
261 verify(canCommitCallback4).onSuccess(Empty.value());
263 final FutureCallback<DataTreeCandidate> preCommitCallback4 = coordinatedPreCommit(cohort4);
264 verify(preCommitCallback4).onSuccess(cohort4.getCandidate());
266 final FutureCallback<UnsignedLong> commitCallback2 = coordinatedCommit(cohort2);
267 verify(mockShard, never()).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class),
269 verifyNoMoreInteractions(commitCallback2);
271 final FutureCallback<UnsignedLong> commitCallback4 = coordinatedCommit(cohort4);
272 verify(mockShard, never()).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class),
274 verifyNoMoreInteractions(commitCallback4);
276 final FutureCallback<UnsignedLong> commitCallback1 = coordinatedCommit(cohort1);
277 InOrder inOrder = inOrder(mockShard);
278 inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class),
280 inOrder.verify(mockShard).persistPayload(eq(cohort2.getIdentifier()), any(CommitTransactionPayload.class),
282 verifyNoMoreInteractions(commitCallback1);
283 verifyNoMoreInteractions(commitCallback2);
285 final FutureCallback<UnsignedLong> commitCallback3 = coordinatedCommit(cohort3);
286 inOrder = inOrder(mockShard);
287 inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class),
289 inOrder.verify(mockShard).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class),
291 verifyNoMoreInteractions(commitCallback3);
292 verifyNoMoreInteractions(commitCallback4);
294 final ShardDataTreeCohort cohort5 = newShardDataTreeCohort(snapshot ->
295 snapshot.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
296 final FutureCallback<Empty> canCommitCallback5 = coordinatedCanCommit(cohort5);
298 // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
299 CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(),
300 cohort1.getCandidate());
301 shardDataTree.applyReplicatedPayload(cohort1.getIdentifier(), mockPayload);
302 shardDataTree.applyReplicatedPayload(cohort2.getIdentifier(), mockPayload);
303 shardDataTree.applyReplicatedPayload(cohort3.getIdentifier(), mockPayload);
304 shardDataTree.applyReplicatedPayload(cohort4.getIdentifier(), mockPayload);
306 inOrder = inOrder(commitCallback1, commitCallback2, commitCallback3, commitCallback4);
307 inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class));
308 inOrder.verify(commitCallback2).onSuccess(any(UnsignedLong.class));
309 inOrder.verify(commitCallback3).onSuccess(any(UnsignedLong.class));
310 inOrder.verify(commitCallback4).onSuccess(any(UnsignedLong.class));
312 verify(canCommitCallback5).onSuccess(Empty.value());
314 final DataTreeSnapshot snapshot =
315 shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
316 assertEquals("Car node", Optional.of(carNode), snapshot.readNode(carPath));
317 assertEquals("People node", Optional.of(peopleNode), snapshot.readNode(PeopleModel.BASE_PATH));
321 public void testPipelinedTransactionsWithImmediateCommits() throws Exception {
322 final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
323 snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
325 final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
326 snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
328 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
329 MapEntryNode carNode = CarsModel.newCarEntry("optima", Uint64.valueOf(100));
330 final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode));
332 final FutureCallback<UnsignedLong> commitCallback2 = immediate3PhaseCommit(cohort2);
333 final FutureCallback<UnsignedLong> commitCallback3 = immediate3PhaseCommit(cohort3);
334 final FutureCallback<UnsignedLong> commitCallback1 = immediate3PhaseCommit(cohort1);
336 InOrder inOrder = inOrder(mockShard);
337 inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class),
339 inOrder.verify(mockShard).persistPayload(eq(cohort2.getIdentifier()), any(CommitTransactionPayload.class),
341 inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class),
344 // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
345 CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(),
346 cohort1.getCandidate());
347 shardDataTree.applyReplicatedPayload(cohort1.getIdentifier(), mockPayload);
348 shardDataTree.applyReplicatedPayload(cohort2.getIdentifier(), mockPayload);
349 shardDataTree.applyReplicatedPayload(cohort3.getIdentifier(), mockPayload);
351 inOrder = inOrder(commitCallback1, commitCallback2, commitCallback3);
352 inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class));
353 inOrder.verify(commitCallback2).onSuccess(any(UnsignedLong.class));
354 inOrder.verify(commitCallback3).onSuccess(any(UnsignedLong.class));
356 final DataTreeSnapshot snapshot =
357 shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
358 assertEquals("Car node", Optional.of(carNode), snapshot.readNode(carPath));
362 public void testPipelinedTransactionsWithImmediateReplication() {
363 immediatePayloadReplication(shardDataTree, mockShard);
365 final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
366 snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
368 final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
369 snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
371 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
372 MapEntryNode carNode = CarsModel.newCarEntry("optima", Uint64.valueOf(100));
373 final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode));
375 final FutureCallback<UnsignedLong> commitCallback1 = immediate3PhaseCommit(cohort1);
376 final FutureCallback<UnsignedLong> commitCallback2 = immediate3PhaseCommit(cohort2);
377 final FutureCallback<UnsignedLong> commitCallback3 = immediate3PhaseCommit(cohort3);
379 InOrder inOrder = inOrder(commitCallback1, commitCallback2, commitCallback3);
380 inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class));
381 inOrder.verify(commitCallback2).onSuccess(any(UnsignedLong.class));
382 inOrder.verify(commitCallback3).onSuccess(any(UnsignedLong.class));
384 final DataTreeSnapshot snapshot = shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
385 Optional<NormalizedNode> optional = snapshot.readNode(CarsModel.BASE_PATH);
386 assertTrue("Car node present", optional.isPresent());
389 @SuppressWarnings("unchecked")
391 public void testAbortWithPendingCommits() throws Exception {
392 final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
393 snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
395 final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
396 snapshot.write(PeopleModel.BASE_PATH, PeopleModel.create()));
398 final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot ->
399 snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
401 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
402 MapEntryNode carNode = CarsModel.newCarEntry("optima", Uint64.valueOf(100));
403 final ShardDataTreeCohort cohort4 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode));
405 coordinatedCanCommit(cohort2);
406 immediateCanCommit(cohort1);
407 coordinatedCanCommit(cohort3);
408 coordinatedCanCommit(cohort4);
410 coordinatedPreCommit(cohort1);
411 coordinatedPreCommit(cohort2);
412 coordinatedPreCommit(cohort3);
414 FutureCallback<Empty> mockAbortCallback = mock(FutureCallback.class);
415 doNothing().when(mockAbortCallback).onSuccess(Empty.value());
416 cohort2.abort(mockAbortCallback);
417 verify(mockAbortCallback).onSuccess(Empty.value());
419 coordinatedPreCommit(cohort4);
420 coordinatedCommit(cohort1);
421 coordinatedCommit(cohort3);
422 coordinatedCommit(cohort4);
424 InOrder inOrder = inOrder(mockShard);
425 inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class),
427 inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class),
429 inOrder.verify(mockShard).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class),
432 // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
433 CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(),
434 cohort1.getCandidate());
435 shardDataTree.applyReplicatedPayload(cohort1.getIdentifier(), mockPayload);
436 shardDataTree.applyReplicatedPayload(cohort3.getIdentifier(), mockPayload);
437 shardDataTree.applyReplicatedPayload(cohort4.getIdentifier(), mockPayload);
439 final DataTreeSnapshot snapshot =
440 shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
441 Optional<NormalizedNode> optional = snapshot.readNode(carPath);
442 assertEquals("Car node", Optional.of(carNode), optional);
445 @SuppressWarnings("unchecked")
447 public void testAbortWithFailedRebase() {
448 immediatePayloadReplication(shardDataTree, mockShard);
450 final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
451 snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
453 final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
454 snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
456 NormalizedNode peopleNode = PeopleModel.create();
457 final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot ->
458 snapshot.write(PeopleModel.BASE_PATH, peopleNode));
460 immediateCanCommit(cohort1);
461 FutureCallback<Empty> canCommitCallback2 = coordinatedCanCommit(cohort2);
463 coordinatedPreCommit(cohort1);
464 verify(canCommitCallback2).onSuccess(Empty.value());
466 FutureCallback<Empty> mockAbortCallback = mock(FutureCallback.class);
467 doNothing().when(mockAbortCallback).onSuccess(Empty.value());
468 cohort1.abort(mockAbortCallback);
469 verify(mockAbortCallback).onSuccess(Empty.value());
471 FutureCallback<DataTreeCandidate> preCommitCallback2 = coordinatedPreCommit(cohort2);
472 verify(preCommitCallback2).onFailure(any(Throwable.class));
474 immediateCanCommit(cohort3);
475 immediatePreCommit(cohort3);
476 immediateCommit(cohort3);
478 final DataTreeSnapshot snapshot =
479 shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
480 Optional<NormalizedNode> optional = snapshot.readNode(PeopleModel.BASE_PATH);
481 assertEquals("People node", Optional.of(peopleNode), optional);
485 public void testUintCommitPayload() throws IOException {
486 shardDataTree.applyRecoveryPayload(CommitTransactionPayload.create(nextTransactionId(),
487 DataTreeCandidates.fromNormalizedNode(YangInstanceIdentifier.of(), bigIntegerRoot()),
488 PayloadVersion.POTASSIUM));
494 public void testUintSnapshot() throws IOException, DataValidationFailedException {
495 shardDataTree.applyRecoverySnapshot(new ShardSnapshotState(new MetadataShardDataTreeSnapshot(bigIntegerRoot()),
502 public void testUintReplay() throws DataValidationFailedException, IOException {
503 // Commit two writes and one merge, saving the data tree candidate for each.
507 final DataTree dataTree = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_OPERATIONAL,
509 DataTreeModification mod = dataTree.takeSnapshot().newModification();
510 mod.write(CarsModel.BASE_PATH, Builders.containerBuilder()
511 .withNodeIdentifier(new NodeIdentifier(CarsModel.BASE_QNAME))
512 .withChild(Builders.mapBuilder()
513 .withNodeIdentifier(new NodeIdentifier(CarsModel.CAR_QNAME))
514 .withChild(createCar("one", Uint64.ONE))
518 dataTree.validate(mod);
519 final DataTreeCandidate first = dataTree.prepare(mod);
520 dataTree.commit(first);
522 mod = dataTree.takeSnapshot().newModification();
523 mod.write(CarsModel.newCarPath("two"), createCar("two", Uint64.TWO));
525 dataTree.validate(mod);
526 final DataTreeCandidate second = dataTree.prepare(mod);
527 dataTree.commit(second);
529 mod = dataTree.takeSnapshot().newModification();
530 mod.merge(CarsModel.CAR_LIST_PATH, Builders.mapBuilder()
531 .withNodeIdentifier(new NodeIdentifier(CarsModel.CAR_QNAME))
532 .withChild(createCar("three", Uint64.TEN))
535 dataTree.validate(mod);
536 final DataTreeCandidate third = dataTree.prepare(mod);
537 dataTree.commit(third);
539 // Apply first candidate as a snapshot
540 shardDataTree.applyRecoverySnapshot(new ShardSnapshotState(
541 new MetadataShardDataTreeSnapshot(first.getRootNode().getDataAfter()), true));
542 // Apply the other two snapshots as transactions
543 shardDataTree.applyRecoveryPayload(CommitTransactionPayload.create(nextTransactionId(), second,
544 PayloadVersion.POTASSIUM));
545 shardDataTree.applyRecoveryPayload(CommitTransactionPayload.create(nextTransactionId(), third,
546 PayloadVersion.POTASSIUM));
548 // Verify uint translation
549 final DataTreeSnapshot snapshot = shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
551 assertEquals(Builders.mapBuilder()
552 .withNodeIdentifier(new NodeIdentifier(CarsModel.CAR_QNAME))
554 .withChild(createCar("one", Uint64.ONE))
555 .withChild(createCar("two", Uint64.TWO))
556 .withChild(createCar("three", Uint64.TEN))
557 .build(), snapshot.readNode(CarsModel.CAR_LIST_PATH).orElseThrow());
560 private void assertCarsUint64() {
561 final DataTreeSnapshot snapshot = shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
562 final NormalizedNode cars = snapshot.readNode(CarsModel.CAR_LIST_PATH).orElseThrow();
564 assertEquals(Builders.mapBuilder()
565 .withNodeIdentifier(new NodeIdentifier(CarsModel.CAR_QNAME))
567 .withChild(createCar("foo", Uint64.ONE))
571 private static ContainerNode bigIntegerRoot() {
572 return Builders.containerBuilder()
573 .withNodeIdentifier(new NodeIdentifier(SchemaContext.NAME))
574 .withChild(Builders.containerBuilder()
575 .withNodeIdentifier(new NodeIdentifier(CarsModel.CARS_QNAME))
576 .withChild(Builders.mapBuilder()
577 .withNodeIdentifier(new NodeIdentifier(CarsModel.CAR_QNAME))
578 .withChild(createCar("foo", Uint64.ONE))
584 private static MapEntryNode createCar(final String name, final Object value) {
585 return Builders.mapEntryBuilder()
586 .withNodeIdentifier(NodeIdentifierWithPredicates.of(CarsModel.CAR_QNAME, CarsModel.CAR_NAME_QNAME, name))
587 .withChild(ImmutableNodes.leafNode(CarsModel.CAR_NAME_QNAME, name))
588 // Note: old BigInteger
589 .withChild(ImmutableNodes.leafNode(CarsModel.CAR_PRICE_QNAME, value))
593 private ShardDataTreeCohort newShardDataTreeCohort(final DataTreeOperation operation) {
594 final ReadWriteShardDataTreeTransaction transaction =
595 shardDataTree.newReadWriteTransaction(nextTransactionId());
596 final DataTreeModification snapshot = transaction.getSnapshot();
597 operation.execute(snapshot);
598 return shardDataTree.finishTransaction(transaction, Optional.empty());
601 @SuppressWarnings({ "rawtypes", "unchecked" })
602 private static void verifyOnDataTreeChanged(final DOMDataTreeChangeListener listener,
603 final Consumer<DataTreeCandidate> callback) {
604 ArgumentCaptor<List> changes = ArgumentCaptor.forClass(List.class);
605 verify(listener, atLeastOnce()).onDataTreeChanged(changes.capture());
606 for (Collection list : changes.getAllValues()) {
607 for (Object dtc : list) {
608 callback.accept((DataTreeCandidate)dtc);
615 private static NormalizedNode getCars(final ShardDataTree shardDataTree) {
616 final ReadOnlyShardDataTreeTransaction readOnlyShardDataTreeTransaction =
617 shardDataTree.newReadOnlyTransaction(nextTransactionId());
618 final DataTreeSnapshot snapshot1 = readOnlyShardDataTreeTransaction.getSnapshot();
620 final Optional<NormalizedNode> optional = snapshot1.readNode(CarsModel.BASE_PATH);
622 assertTrue(optional.isPresent());
624 return optional.orElseThrow();
627 private static DataTreeCandidate addCar(final ShardDataTree shardDataTree) {
628 return addCar(shardDataTree, "altima");
631 private static DataTreeCandidate addCar(final ShardDataTree shardDataTree, final String name) {
632 return doTransaction(shardDataTree, snapshot -> {
633 snapshot.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer());
634 snapshot.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
635 snapshot.write(CarsModel.newCarPath(name), CarsModel.newCarEntry(name, Uint64.valueOf(100)));
639 private static DataTreeCandidate removeCar(final ShardDataTree shardDataTree) {
640 return doTransaction(shardDataTree, snapshot -> snapshot.delete(CarsModel.newCarPath("altima")));
644 private interface DataTreeOperation {
645 void execute(DataTreeModification snapshot);
648 private static DataTreeCandidate doTransaction(final ShardDataTree shardDataTree,
649 final DataTreeOperation operation) {
650 final ReadWriteShardDataTreeTransaction transaction =
651 shardDataTree.newReadWriteTransaction(nextTransactionId());
652 final DataTreeModification snapshot = transaction.getSnapshot();
653 operation.execute(snapshot);
654 final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction, Optional.empty());
656 immediateCanCommit(cohort);
657 immediatePreCommit(cohort);
658 final DataTreeCandidate candidate = cohort.getCandidate();
659 immediateCommit(cohort);
664 private static DataTreeCandidate applyCandidates(final ShardDataTree shardDataTree,
665 final List<DataTreeCandidate> candidates) {
666 final ReadWriteShardDataTreeTransaction transaction =
667 shardDataTree.newReadWriteTransaction(nextTransactionId());
668 final DataTreeModification snapshot = transaction.getSnapshot();
669 for (final DataTreeCandidate candidateTip : candidates) {
670 DataTreeCandidates.applyToModification(snapshot, candidateTip);
672 final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction, Optional.empty());
674 immediateCanCommit(cohort);
675 immediatePreCommit(cohort);
676 final DataTreeCandidate candidate = cohort.getCandidate();
677 immediateCommit(cohort);