2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.junit.Assert.fail;
15 import static org.mockito.ArgumentMatchers.any;
16 import static org.mockito.ArgumentMatchers.anyBoolean;
17 import static org.mockito.ArgumentMatchers.eq;
18 import static org.mockito.Mockito.atLeastOnce;
19 import static org.mockito.Mockito.doNothing;
20 import static org.mockito.Mockito.doReturn;
21 import static org.mockito.Mockito.inOrder;
22 import static org.mockito.Mockito.mock;
23 import static org.mockito.Mockito.never;
24 import static org.mockito.Mockito.reset;
25 import static org.mockito.Mockito.verify;
26 import static org.mockito.Mockito.verifyNoMoreInteractions;
27 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.coordinatedCanCommit;
28 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.coordinatedCommit;
29 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.coordinatedPreCommit;
30 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediate3PhaseCommit;
31 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCanCommit;
32 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCommit;
33 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediatePayloadReplication;
34 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediatePreCommit;
36 import com.google.common.base.Ticker;
37 import com.google.common.collect.Maps;
38 import com.google.common.primitives.UnsignedLong;
39 import com.google.common.util.concurrent.FutureCallback;
40 import java.math.BigInteger;
41 import java.util.ArrayList;
42 import java.util.Collection;
43 import java.util.List;
45 import java.util.Optional;
46 import java.util.function.Consumer;
47 import org.junit.Before;
48 import org.junit.Test;
49 import org.mockito.ArgumentCaptor;
50 import org.mockito.InOrder;
51 import org.mockito.Mockito;
52 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
53 import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
54 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
55 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
56 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
57 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
58 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
59 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
60 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
61 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
62 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
63 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
64 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
65 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
66 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
67 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
69 public class ShardDataTreeTest extends AbstractTest {
70 private static final DatastoreContext DATASTORE_CONTEXT = DatastoreContext.newBuilder().build();
72 private final Shard mockShard = Mockito.mock(Shard.class);
73 private ShardDataTree shardDataTree;
74 private SchemaContext fullSchema;
78 doReturn(Ticker.systemTicker()).when(mockShard).ticker();
79 doReturn(mock(ShardStats.class)).when(mockShard).getShardMBean();
80 doReturn(DATASTORE_CONTEXT).when(mockShard).getDatastoreContext();
82 fullSchema = SchemaContextHelper.full();
84 shardDataTree = new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL);
88 public void testWrite() {
89 modify(false, true, true);
93 public void testMerge() {
94 modify(true, true, true);
97 private void modify(final boolean merge, final boolean expectedCarsPresent, final boolean expectedPeoplePresent) {
98 immediatePayloadReplication(shardDataTree, mockShard);
100 assertEquals(fullSchema, shardDataTree.getSchemaContext());
102 final ReadWriteShardDataTreeTransaction transaction =
103 shardDataTree.newReadWriteTransaction(nextTransactionId());
105 final DataTreeModification snapshot = transaction.getSnapshot();
107 assertNotNull(snapshot);
110 snapshot.merge(CarsModel.BASE_PATH, CarsModel.create());
111 snapshot.merge(PeopleModel.BASE_PATH, PeopleModel.create());
113 snapshot.write(CarsModel.BASE_PATH, CarsModel.create());
114 snapshot.write(PeopleModel.BASE_PATH, PeopleModel.create());
117 final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction, Optional.empty());
119 immediateCanCommit(cohort);
120 immediatePreCommit(cohort);
121 immediateCommit(cohort);
123 final ReadOnlyShardDataTreeTransaction readOnlyShardDataTreeTransaction =
124 shardDataTree.newReadOnlyTransaction(nextTransactionId());
126 final DataTreeSnapshot snapshot1 = readOnlyShardDataTreeTransaction.getSnapshot();
128 final Optional<NormalizedNode<?, ?>> optional = snapshot1.readNode(CarsModel.BASE_PATH);
130 assertEquals(expectedCarsPresent, optional.isPresent());
132 final Optional<NormalizedNode<?, ?>> optional1 = snapshot1.readNode(PeopleModel.BASE_PATH);
134 assertEquals(expectedPeoplePresent, optional1.isPresent());
138 public void bug4359AddRemoveCarOnce() {
139 immediatePayloadReplication(shardDataTree, mockShard);
141 final List<DataTreeCandidate> candidates = new ArrayList<>();
142 candidates.add(addCar(shardDataTree));
143 candidates.add(removeCar(shardDataTree));
145 final NormalizedNode<?, ?> expected = getCars(shardDataTree);
147 applyCandidates(shardDataTree, candidates);
149 final NormalizedNode<?, ?> actual = getCars(shardDataTree);
151 assertEquals(expected, actual);
155 public void bug4359AddRemoveCarTwice() {
156 immediatePayloadReplication(shardDataTree, mockShard);
158 final List<DataTreeCandidate> candidates = new ArrayList<>();
159 candidates.add(addCar(shardDataTree));
160 candidates.add(removeCar(shardDataTree));
161 candidates.add(addCar(shardDataTree));
162 candidates.add(removeCar(shardDataTree));
164 final NormalizedNode<?, ?> expected = getCars(shardDataTree);
166 applyCandidates(shardDataTree, candidates);
168 final NormalizedNode<?, ?> actual = getCars(shardDataTree);
170 assertEquals(expected, actual);
174 public void testListenerNotifiedOnApplySnapshot() throws Exception {
175 immediatePayloadReplication(shardDataTree, mockShard);
177 DOMDataTreeChangeListener listener = mock(DOMDataTreeChangeListener.class);
178 shardDataTree.registerTreeChangeListener(CarsModel.CAR_LIST_PATH.node(CarsModel.CAR_QNAME), listener,
179 com.google.common.base.Optional.absent(), noop -> { });
181 addCar(shardDataTree, "optima");
183 verifyOnDataTreeChanged(listener, dtc -> {
184 assertEquals("getModificationType", ModificationType.WRITE, dtc.getRootNode().getModificationType());
185 assertEquals("getRootPath", CarsModel.newCarPath("optima"), dtc.getRootPath());
188 addCar(shardDataTree, "sportage");
190 verifyOnDataTreeChanged(listener, dtc -> {
191 assertEquals("getModificationType", ModificationType.WRITE, dtc.getRootNode().getModificationType());
192 assertEquals("getRootPath", CarsModel.newCarPath("sportage"), dtc.getRootPath());
195 ShardDataTree newDataTree = new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL);
196 immediatePayloadReplication(newDataTree, mockShard);
197 addCar(newDataTree, "optima");
198 addCar(newDataTree, "murano");
200 shardDataTree.applySnapshot(newDataTree.takeStateSnapshot());
202 Map<YangInstanceIdentifier, ModificationType> expChanges = Maps.newHashMap();
203 expChanges.put(CarsModel.newCarPath("optima"), ModificationType.WRITE);
204 expChanges.put(CarsModel.newCarPath("murano"), ModificationType.WRITE);
205 expChanges.put(CarsModel.newCarPath("sportage"), ModificationType.DELETE);
206 verifyOnDataTreeChanged(listener, dtc -> {
207 ModificationType expType = expChanges.remove(dtc.getRootPath());
208 assertNotNull("Got unexpected change for " + dtc.getRootPath(), expType);
209 assertEquals("getModificationType", expType, dtc.getRootNode().getModificationType());
212 if (!expChanges.isEmpty()) {
213 fail("Missing change notifications: " + expChanges);
218 public void testPipelinedTransactionsWithCoordinatedCommits() throws Exception {
219 final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
220 snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
222 final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
223 snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
225 NormalizedNode<?, ?> peopleNode = PeopleModel.create();
226 final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot ->
227 snapshot.write(PeopleModel.BASE_PATH, peopleNode));
229 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
230 MapEntryNode carNode = CarsModel.newCarEntry("optima", new BigInteger("100"));
231 final ShardDataTreeCohort cohort4 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode));
233 immediateCanCommit(cohort1);
234 final FutureCallback<Void> canCommitCallback2 = coordinatedCanCommit(cohort2);
235 final FutureCallback<Void> canCommitCallback3 = coordinatedCanCommit(cohort3);
236 final FutureCallback<Void> canCommitCallback4 = coordinatedCanCommit(cohort4);
238 final FutureCallback<DataTreeCandidate> preCommitCallback1 = coordinatedPreCommit(cohort1);
239 verify(preCommitCallback1).onSuccess(cohort1.getCandidate());
240 verify(canCommitCallback2).onSuccess(null);
242 final FutureCallback<DataTreeCandidate> preCommitCallback2 = coordinatedPreCommit(cohort2);
243 verify(preCommitCallback2).onSuccess(cohort2.getCandidate());
244 verify(canCommitCallback3).onSuccess(null);
246 final FutureCallback<DataTreeCandidate> preCommitCallback3 = coordinatedPreCommit(cohort3);
247 verify(preCommitCallback3).onSuccess(cohort3.getCandidate());
248 verify(canCommitCallback4).onSuccess(null);
250 final FutureCallback<DataTreeCandidate> preCommitCallback4 = coordinatedPreCommit(cohort4);
251 verify(preCommitCallback4).onSuccess(cohort4.getCandidate());
253 final FutureCallback<UnsignedLong> commitCallback2 = coordinatedCommit(cohort2);
254 verify(mockShard, never()).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class),
256 verifyNoMoreInteractions(commitCallback2);
258 final FutureCallback<UnsignedLong> commitCallback4 = coordinatedCommit(cohort4);
259 verify(mockShard, never()).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class),
261 verifyNoMoreInteractions(commitCallback4);
263 final FutureCallback<UnsignedLong> commitCallback1 = coordinatedCommit(cohort1);
264 InOrder inOrder = inOrder(mockShard);
265 inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class),
267 inOrder.verify(mockShard).persistPayload(eq(cohort2.getIdentifier()), any(CommitTransactionPayload.class),
269 verifyNoMoreInteractions(commitCallback1);
270 verifyNoMoreInteractions(commitCallback2);
272 final FutureCallback<UnsignedLong> commitCallback3 = coordinatedCommit(cohort3);
273 inOrder = inOrder(mockShard);
274 inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class),
276 inOrder.verify(mockShard).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class),
278 verifyNoMoreInteractions(commitCallback3);
279 verifyNoMoreInteractions(commitCallback4);
281 final ShardDataTreeCohort cohort5 = newShardDataTreeCohort(snapshot ->
282 snapshot.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
283 final FutureCallback<Void> canCommitCallback5 = coordinatedCanCommit(cohort5);
285 // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
286 CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(),
287 cohort1.getCandidate());
288 shardDataTree.applyReplicatedPayload(cohort1.getIdentifier(), mockPayload);
289 shardDataTree.applyReplicatedPayload(cohort2.getIdentifier(), mockPayload);
290 shardDataTree.applyReplicatedPayload(cohort3.getIdentifier(), mockPayload);
291 shardDataTree.applyReplicatedPayload(cohort4.getIdentifier(), mockPayload);
293 inOrder = inOrder(commitCallback1, commitCallback2, commitCallback3, commitCallback4);
294 inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class));
295 inOrder.verify(commitCallback2).onSuccess(any(UnsignedLong.class));
296 inOrder.verify(commitCallback3).onSuccess(any(UnsignedLong.class));
297 inOrder.verify(commitCallback4).onSuccess(any(UnsignedLong.class));
299 verify(canCommitCallback5).onSuccess(null);
301 final DataTreeSnapshot snapshot =
302 shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
303 Optional<NormalizedNode<?, ?>> optional = snapshot.readNode(carPath);
304 assertTrue("Car node present", optional.isPresent());
305 assertEquals("Car node", carNode, optional.get());
307 optional = snapshot.readNode(PeopleModel.BASE_PATH);
308 assertTrue("People node present", optional.isPresent());
309 assertEquals("People node", peopleNode, optional.get());
313 public void testPipelinedTransactionsWithImmediateCommits() throws Exception {
314 final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
315 snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
317 final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
318 snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
320 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
321 MapEntryNode carNode = CarsModel.newCarEntry("optima", new BigInteger("100"));
322 final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode));
324 final FutureCallback<UnsignedLong> commitCallback2 = immediate3PhaseCommit(cohort2);
325 final FutureCallback<UnsignedLong> commitCallback3 = immediate3PhaseCommit(cohort3);
326 final FutureCallback<UnsignedLong> commitCallback1 = immediate3PhaseCommit(cohort1);
328 InOrder inOrder = inOrder(mockShard);
329 inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class),
331 inOrder.verify(mockShard).persistPayload(eq(cohort2.getIdentifier()), any(CommitTransactionPayload.class),
333 inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class),
336 // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
337 CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(),
338 cohort1.getCandidate());
339 shardDataTree.applyReplicatedPayload(cohort1.getIdentifier(), mockPayload);
340 shardDataTree.applyReplicatedPayload(cohort2.getIdentifier(), mockPayload);
341 shardDataTree.applyReplicatedPayload(cohort3.getIdentifier(), mockPayload);
343 inOrder = inOrder(commitCallback1, commitCallback2, commitCallback3);
344 inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class));
345 inOrder.verify(commitCallback2).onSuccess(any(UnsignedLong.class));
346 inOrder.verify(commitCallback3).onSuccess(any(UnsignedLong.class));
348 final DataTreeSnapshot snapshot =
349 shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
350 Optional<NormalizedNode<?, ?>> optional = snapshot.readNode(carPath);
351 assertTrue("Car node present", optional.isPresent());
352 assertEquals("Car node", carNode, optional.get());
356 public void testPipelinedTransactionsWithImmediateReplication() {
357 immediatePayloadReplication(shardDataTree, mockShard);
359 final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
360 snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
362 final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
363 snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
365 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
366 MapEntryNode carNode = CarsModel.newCarEntry("optima", new BigInteger("100"));
367 final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode));
369 final FutureCallback<UnsignedLong> commitCallback1 = immediate3PhaseCommit(cohort1);
370 final FutureCallback<UnsignedLong> commitCallback2 = immediate3PhaseCommit(cohort2);
371 final FutureCallback<UnsignedLong> commitCallback3 = immediate3PhaseCommit(cohort3);
373 InOrder inOrder = inOrder(commitCallback1, commitCallback2, commitCallback3);
374 inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class));
375 inOrder.verify(commitCallback2).onSuccess(any(UnsignedLong.class));
376 inOrder.verify(commitCallback3).onSuccess(any(UnsignedLong.class));
378 final DataTreeSnapshot snapshot = shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
379 Optional<NormalizedNode<?, ?>> optional = snapshot.readNode(CarsModel.BASE_PATH);
380 assertTrue("Car node present", optional.isPresent());
383 @SuppressWarnings("unchecked")
385 public void testAbortWithPendingCommits() throws Exception {
386 final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
387 snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
389 final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
390 snapshot.write(PeopleModel.BASE_PATH, PeopleModel.create()));
392 final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot ->
393 snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
395 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
396 MapEntryNode carNode = CarsModel.newCarEntry("optima", new BigInteger("100"));
397 final ShardDataTreeCohort cohort4 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode));
399 coordinatedCanCommit(cohort2);
400 immediateCanCommit(cohort1);
401 coordinatedCanCommit(cohort3);
402 coordinatedCanCommit(cohort4);
404 coordinatedPreCommit(cohort1);
405 coordinatedPreCommit(cohort2);
406 coordinatedPreCommit(cohort3);
408 FutureCallback<Void> mockAbortCallback = mock(FutureCallback.class);
409 doNothing().when(mockAbortCallback).onSuccess(null);
410 cohort2.abort(mockAbortCallback);
411 verify(mockAbortCallback).onSuccess(null);
413 coordinatedPreCommit(cohort4);
414 coordinatedCommit(cohort1);
415 coordinatedCommit(cohort3);
416 coordinatedCommit(cohort4);
418 InOrder inOrder = inOrder(mockShard);
419 inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class),
421 inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class),
423 inOrder.verify(mockShard).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class),
426 // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
427 CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(),
428 cohort1.getCandidate());
429 shardDataTree.applyReplicatedPayload(cohort1.getIdentifier(), mockPayload);
430 shardDataTree.applyReplicatedPayload(cohort3.getIdentifier(), mockPayload);
431 shardDataTree.applyReplicatedPayload(cohort4.getIdentifier(), mockPayload);
433 final DataTreeSnapshot snapshot =
434 shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
435 Optional<NormalizedNode<?, ?>> optional = snapshot.readNode(carPath);
436 assertTrue("Car node present", optional.isPresent());
437 assertEquals("Car node", carNode, optional.get());
440 @SuppressWarnings("unchecked")
442 public void testAbortWithFailedRebase() {
443 immediatePayloadReplication(shardDataTree, mockShard);
445 final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
446 snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
448 final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
449 snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
451 NormalizedNode<?, ?> peopleNode = PeopleModel.create();
452 final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot ->
453 snapshot.write(PeopleModel.BASE_PATH, peopleNode));
455 immediateCanCommit(cohort1);
456 FutureCallback<Void> canCommitCallback2 = coordinatedCanCommit(cohort2);
458 coordinatedPreCommit(cohort1);
459 verify(canCommitCallback2).onSuccess(null);
461 FutureCallback<Void> mockAbortCallback = mock(FutureCallback.class);
462 doNothing().when(mockAbortCallback).onSuccess(null);
463 cohort1.abort(mockAbortCallback);
464 verify(mockAbortCallback).onSuccess(null);
466 FutureCallback<DataTreeCandidate> preCommitCallback2 = coordinatedPreCommit(cohort2);
467 verify(preCommitCallback2).onFailure(any(Throwable.class));
469 immediateCanCommit(cohort3);
470 immediatePreCommit(cohort3);
471 immediateCommit(cohort3);
473 final DataTreeSnapshot snapshot =
474 shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
475 Optional<NormalizedNode<?, ?>> optional = snapshot.readNode(PeopleModel.BASE_PATH);
476 assertTrue("People node present", optional.isPresent());
477 assertEquals("People node", peopleNode, optional.get());
480 private ShardDataTreeCohort newShardDataTreeCohort(final DataTreeOperation operation) {
481 final ReadWriteShardDataTreeTransaction transaction =
482 shardDataTree.newReadWriteTransaction(nextTransactionId());
483 final DataTreeModification snapshot = transaction.getSnapshot();
484 operation.execute(snapshot);
485 return shardDataTree.finishTransaction(transaction, Optional.empty());
488 @SuppressWarnings({ "rawtypes", "unchecked" })
489 private static void verifyOnDataTreeChanged(final DOMDataTreeChangeListener listener,
490 final Consumer<DataTreeCandidate> callback) {
491 ArgumentCaptor<Collection> changes = ArgumentCaptor.forClass(Collection.class);
492 verify(listener, atLeastOnce()).onDataTreeChanged(changes.capture());
493 for (Collection list : changes.getAllValues()) {
494 for (Object dtc : list) {
495 callback.accept((DataTreeCandidate)dtc);
502 private static NormalizedNode<?, ?> getCars(final ShardDataTree shardDataTree) {
503 final ReadOnlyShardDataTreeTransaction readOnlyShardDataTreeTransaction =
504 shardDataTree.newReadOnlyTransaction(nextTransactionId());
505 final DataTreeSnapshot snapshot1 = readOnlyShardDataTreeTransaction.getSnapshot();
507 final Optional<NormalizedNode<?, ?>> optional = snapshot1.readNode(CarsModel.BASE_PATH);
509 assertTrue(optional.isPresent());
511 return optional.get();
514 private static DataTreeCandidate addCar(final ShardDataTree shardDataTree) {
515 return addCar(shardDataTree, "altima");
518 private static DataTreeCandidate addCar(final ShardDataTree shardDataTree, final String name) {
519 return doTransaction(shardDataTree, snapshot -> {
520 snapshot.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer());
521 snapshot.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
522 snapshot.write(CarsModel.newCarPath(name), CarsModel.newCarEntry(name, new BigInteger("100")));
526 private static DataTreeCandidate removeCar(final ShardDataTree shardDataTree) {
527 return doTransaction(shardDataTree, snapshot -> snapshot.delete(CarsModel.newCarPath("altima")));
531 private interface DataTreeOperation {
532 void execute(DataTreeModification snapshot);
535 private static DataTreeCandidate doTransaction(final ShardDataTree shardDataTree,
536 final DataTreeOperation operation) {
537 final ReadWriteShardDataTreeTransaction transaction =
538 shardDataTree.newReadWriteTransaction(nextTransactionId());
539 final DataTreeModification snapshot = transaction.getSnapshot();
540 operation.execute(snapshot);
541 final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction, Optional.empty());
543 immediateCanCommit(cohort);
544 immediatePreCommit(cohort);
545 final DataTreeCandidate candidate = cohort.getCandidate();
546 immediateCommit(cohort);
551 private static DataTreeCandidate applyCandidates(final ShardDataTree shardDataTree,
552 final List<DataTreeCandidate> candidates) {
553 final ReadWriteShardDataTreeTransaction transaction =
554 shardDataTree.newReadWriteTransaction(nextTransactionId());
555 final DataTreeModification snapshot = transaction.getSnapshot();
556 for (final DataTreeCandidate candidateTip : candidates) {
557 DataTreeCandidates.applyToModification(snapshot, candidateTip);
559 final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction, Optional.empty());
561 immediateCanCommit(cohort);
562 immediatePreCommit(cohort);
563 final DataTreeCandidate candidate = cohort.getCandidate();
564 immediateCommit(cohort);