BUG-7033: Implement pipe-lining in ShardDataTree
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardDataTreeTest.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.fail;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Matchers.eq;
16 import static org.mockito.Mockito.atLeastOnce;
17 import static org.mockito.Mockito.doNothing;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.inOrder;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.never;
22 import static org.mockito.Mockito.reset;
23 import static org.mockito.Mockito.verify;
24 import static org.mockito.Mockito.verifyNoMoreInteractions;
25 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.coordinatedCanCommit;
26 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.coordinatedCommit;
27 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.coordinatedPreCommit;
28 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCanCommit;
29 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediateCommit;
30 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.immediatePreCommit;
31
32 import com.google.common.base.Optional;
33 import com.google.common.base.Ticker;
34 import com.google.common.collect.Maps;
35 import com.google.common.primitives.UnsignedLong;
36 import com.google.common.util.concurrent.FutureCallback;
37 import java.math.BigInteger;
38 import java.util.ArrayList;
39 import java.util.Collection;
40 import java.util.List;
41 import java.util.Map;
42 import java.util.concurrent.ExecutionException;
43 import java.util.function.Consumer;
44 import org.junit.Before;
45 import org.junit.Test;
46 import org.mockito.ArgumentCaptor;
47 import org.mockito.InOrder;
48 import org.mockito.Mockito;
49 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
50 import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
51 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
52 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
53 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
54 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
55 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
56 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
57 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
58 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
59 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
60 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
61 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
62 import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
63 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
64 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
65
66 public class ShardDataTreeTest extends AbstractTest {
67
68     private final Shard mockShard = Mockito.mock(Shard.class);
69     private ShardDataTree shardDataTree;
70     private SchemaContext fullSchema;
71
72     @Before
73     public void setUp() {
74         doReturn(true).when(mockShard).canSkipPayload();
75         doReturn(Ticker.systemTicker()).when(mockShard).ticker();
76         doReturn(Mockito.mock(ShardStats.class)).when(mockShard).getShardMBean();
77
78         fullSchema = SchemaContextHelper.full();
79
80         shardDataTree = new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL);
81     }
82
83     @Test
84     public void testWrite() throws ExecutionException, InterruptedException {
85         modify(false, true, true);
86     }
87
88     @Test
89     public void testMerge() throws ExecutionException, InterruptedException {
90         modify(true, true, true);
91     }
92
93     private void modify(final boolean merge, final boolean expectedCarsPresent, final boolean expectedPeoplePresent)
94             throws ExecutionException, InterruptedException {
95
96         assertEquals(fullSchema, shardDataTree.getSchemaContext());
97
98         final ReadWriteShardDataTreeTransaction transaction =
99                 shardDataTree.newReadWriteTransaction(nextTransactionId());
100
101         final DataTreeModification snapshot = transaction.getSnapshot();
102
103         assertNotNull(snapshot);
104
105         if (merge) {
106             snapshot.merge(CarsModel.BASE_PATH, CarsModel.create());
107             snapshot.merge(PeopleModel.BASE_PATH, PeopleModel.create());
108         } else {
109             snapshot.write(CarsModel.BASE_PATH, CarsModel.create());
110             snapshot.write(PeopleModel.BASE_PATH, PeopleModel.create());
111         }
112
113         final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction);
114
115         immediateCanCommit(cohort);
116         immediatePreCommit(cohort);
117         immediateCommit(cohort);
118
119         final ReadOnlyShardDataTreeTransaction readOnlyShardDataTreeTransaction =
120                 shardDataTree.newReadOnlyTransaction(nextTransactionId());
121
122         final DataTreeSnapshot snapshot1 = readOnlyShardDataTreeTransaction.getSnapshot();
123
124         final Optional<NormalizedNode<?, ?>> optional = snapshot1.readNode(CarsModel.BASE_PATH);
125
126         assertEquals(expectedCarsPresent, optional.isPresent());
127
128         final Optional<NormalizedNode<?, ?>> optional1 = snapshot1.readNode(PeopleModel.BASE_PATH);
129
130         assertEquals(expectedPeoplePresent, optional1.isPresent());
131     }
132
133     @Test
134     public void bug4359AddRemoveCarOnce() throws ExecutionException, InterruptedException {
135         final List<DataTreeCandidate> candidates = new ArrayList<>();
136         candidates.add(addCar(shardDataTree));
137         candidates.add(removeCar(shardDataTree));
138
139         final NormalizedNode<?, ?> expected = getCars(shardDataTree);
140
141         applyCandidates(shardDataTree, candidates);
142
143         final NormalizedNode<?, ?> actual = getCars(shardDataTree);
144
145         assertEquals(expected, actual);
146     }
147
148     @Test
149     public void bug4359AddRemoveCarTwice() throws ExecutionException, InterruptedException {
150         final List<DataTreeCandidate> candidates = new ArrayList<>();
151         candidates.add(addCar(shardDataTree));
152         candidates.add(removeCar(shardDataTree));
153         candidates.add(addCar(shardDataTree));
154         candidates.add(removeCar(shardDataTree));
155
156         final NormalizedNode<?, ?> expected = getCars(shardDataTree);
157
158         applyCandidates(shardDataTree, candidates);
159
160         final NormalizedNode<?, ?> actual = getCars(shardDataTree);
161
162         assertEquals(expected, actual);
163     }
164
165     @Test
166     public void testListenerNotifiedOnApplySnapshot() throws Exception {
167         DOMDataTreeChangeListener listener = mock(DOMDataTreeChangeListener.class);
168         shardDataTree.registerTreeChangeListener(CarsModel.CAR_LIST_PATH.node(CarsModel.CAR_QNAME), listener);
169
170         addCar(shardDataTree, "optima");
171
172         verifyOnDataTreeChanged(listener, dtc -> {
173             assertEquals("getModificationType", ModificationType.WRITE, dtc.getRootNode().getModificationType());
174             assertEquals("getRootPath", CarsModel.newCarPath("optima"), dtc.getRootPath());
175         });
176
177         addCar(shardDataTree, "sportage");
178
179         verifyOnDataTreeChanged(listener, dtc -> {
180             assertEquals("getModificationType", ModificationType.WRITE, dtc.getRootNode().getModificationType());
181             assertEquals("getRootPath", CarsModel.newCarPath("sportage"), dtc.getRootPath());
182         });
183
184         ShardDataTree newDataTree = new ShardDataTree(mockShard, fullSchema, TreeType.OPERATIONAL);
185         addCar(newDataTree, "optima");
186         addCar(newDataTree, "murano");
187
188         shardDataTree.applySnapshot(newDataTree.takeStateSnapshot());
189
190         Map<YangInstanceIdentifier, ModificationType> expChanges = Maps.newHashMap();
191         expChanges.put(CarsModel.newCarPath("optima"), ModificationType.WRITE);
192         expChanges.put(CarsModel.newCarPath("murano"), ModificationType.WRITE);
193         expChanges.put(CarsModel.newCarPath("sportage"), ModificationType.DELETE);
194         verifyOnDataTreeChanged(listener, dtc -> {
195             ModificationType expType = expChanges.remove(dtc.getRootPath());
196             assertNotNull("Got unexpected change for " + dtc.getRootPath(), expType);
197             assertEquals("getModificationType", expType, dtc.getRootNode().getModificationType());
198         });
199
200         if (!expChanges.isEmpty()) {
201             fail("Missing change notifications: " + expChanges);
202         }
203     }
204
205     @Test
206     public void testPipelinedTransactions() throws Exception {
207         doReturn(false).when(mockShard).canSkipPayload();
208
209         final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
210             snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
211
212         final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
213             snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
214
215         NormalizedNode<?, ?> peopleNode = PeopleModel.create();
216         final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot ->
217             snapshot.write(PeopleModel.BASE_PATH, peopleNode));
218
219         YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
220         MapEntryNode carNode = CarsModel.newCarEntry("optima", new BigInteger("100"));
221         final ShardDataTreeCohort cohort4 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode));
222
223         immediateCanCommit(cohort1);
224         final FutureCallback<Void> canCommitCallback2 = coordinatedCanCommit(cohort2);
225         final FutureCallback<Void> canCommitCallback3 = coordinatedCanCommit(cohort3);
226         final FutureCallback<Void> canCommitCallback4 = coordinatedCanCommit(cohort4);
227
228         final FutureCallback<DataTreeCandidate> preCommitCallback1 = coordinatedPreCommit(cohort1);
229         verify(preCommitCallback1).onSuccess(cohort1.getCandidate());
230         verify(canCommitCallback2).onSuccess(null);
231
232         final FutureCallback<DataTreeCandidate> preCommitCallback2 = coordinatedPreCommit(cohort2);
233         verify(preCommitCallback2).onSuccess(cohort2.getCandidate());
234         verify(canCommitCallback3).onSuccess(null);
235
236         final FutureCallback<DataTreeCandidate> preCommitCallback3 = coordinatedPreCommit(cohort3);
237         verify(preCommitCallback3).onSuccess(cohort3.getCandidate());
238         verify(canCommitCallback4).onSuccess(null);
239
240         final FutureCallback<DataTreeCandidate> preCommitCallback4 = coordinatedPreCommit(cohort4);
241         verify(preCommitCallback4).onSuccess(cohort4.getCandidate());
242
243         final FutureCallback<UnsignedLong> commitCallback2 = coordinatedCommit(cohort2);
244         verify(mockShard, never()).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class));
245         verifyNoMoreInteractions(commitCallback2);
246
247         final FutureCallback<UnsignedLong> commitCallback4 = coordinatedCommit(cohort4);
248         verify(mockShard, never()).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class));
249         verifyNoMoreInteractions(commitCallback4);
250
251         final FutureCallback<UnsignedLong> commitCallback1 = coordinatedCommit(cohort1);
252         InOrder inOrder = inOrder(mockShard);
253         inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class));
254         inOrder.verify(mockShard).persistPayload(eq(cohort2.getIdentifier()), any(CommitTransactionPayload.class));
255         inOrder.verifyNoMoreInteractions();
256         verifyNoMoreInteractions(commitCallback1);
257         verifyNoMoreInteractions(commitCallback2);
258
259         final FutureCallback<UnsignedLong> commitCallback3 = coordinatedCommit(cohort3);
260         inOrder = inOrder(mockShard);
261         inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class));
262         inOrder.verify(mockShard).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class));
263         inOrder.verifyNoMoreInteractions();
264         verifyNoMoreInteractions(commitCallback3);
265         verifyNoMoreInteractions(commitCallback4);
266
267         final ShardDataTreeCohort cohort5 = newShardDataTreeCohort(snapshot ->
268             snapshot.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
269         final FutureCallback<Void> canCommitCallback5 = coordinatedCanCommit(cohort5);
270
271         // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
272         CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(),
273                 cohort1.getCandidate());
274         shardDataTree.applyReplicatedPayload(cohort1.getIdentifier(), mockPayload);
275         shardDataTree.applyReplicatedPayload(cohort2.getIdentifier(), mockPayload);
276         shardDataTree.applyReplicatedPayload(cohort3.getIdentifier(), mockPayload);
277         shardDataTree.applyReplicatedPayload(cohort4.getIdentifier(), mockPayload);
278
279         inOrder = inOrder(commitCallback1, commitCallback2, commitCallback3, commitCallback4);
280         inOrder.verify(commitCallback1).onSuccess(any(UnsignedLong.class));
281         inOrder.verify(commitCallback2).onSuccess(any(UnsignedLong.class));
282         inOrder.verify(commitCallback3).onSuccess(any(UnsignedLong.class));
283         inOrder.verify(commitCallback4).onSuccess(any(UnsignedLong.class));
284
285         verify(canCommitCallback5).onSuccess(null);
286
287         final DataTreeSnapshot snapshot =
288                 shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
289         Optional<NormalizedNode<?, ?>> optional = snapshot.readNode(carPath);
290         assertEquals("Car node present", true, optional.isPresent());
291         assertEquals("Car node", carNode, optional.get());
292
293         optional = snapshot.readNode(PeopleModel.BASE_PATH);
294         assertEquals("People node present", true, optional.isPresent());
295         assertEquals("People node", peopleNode, optional.get());
296     }
297
298     @SuppressWarnings("unchecked")
299     @Test
300     public void testAbortWithPendingCommits() throws Exception {
301         doReturn(false).when(mockShard).canSkipPayload();
302
303         final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
304             snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
305
306         final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
307             snapshot.write(PeopleModel.BASE_PATH, PeopleModel.create()));
308
309         final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot ->
310             snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
311
312         YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
313         MapEntryNode carNode = CarsModel.newCarEntry("optima", new BigInteger("100"));
314         final ShardDataTreeCohort cohort4 = newShardDataTreeCohort(snapshot -> snapshot.write(carPath, carNode));
315
316         coordinatedCanCommit(cohort2);
317         immediateCanCommit(cohort1);
318         coordinatedCanCommit(cohort3);
319         coordinatedCanCommit(cohort4);
320
321         coordinatedPreCommit(cohort1);
322         coordinatedPreCommit(cohort2);
323         coordinatedPreCommit(cohort3);
324
325         FutureCallback<Void> mockAbortCallback = mock(FutureCallback.class);
326         doNothing().when(mockAbortCallback).onSuccess(null);
327         cohort2.abort(mockAbortCallback);
328         verify(mockAbortCallback).onSuccess(null);
329
330         coordinatedPreCommit(cohort4);
331         coordinatedCommit(cohort1);
332         coordinatedCommit(cohort3);
333         coordinatedCommit(cohort4);
334
335         InOrder inOrder = inOrder(mockShard);
336         inOrder.verify(mockShard).persistPayload(eq(cohort1.getIdentifier()), any(CommitTransactionPayload.class));
337         inOrder.verify(mockShard).persistPayload(eq(cohort3.getIdentifier()), any(CommitTransactionPayload.class));
338         inOrder.verify(mockShard).persistPayload(eq(cohort4.getIdentifier()), any(CommitTransactionPayload.class));
339         inOrder.verifyNoMoreInteractions();
340
341         // The payload instance doesn't matter - it just needs to be of type CommitTransactionPayload.
342         CommitTransactionPayload mockPayload = CommitTransactionPayload.create(nextTransactionId(),
343                 cohort1.getCandidate());
344         shardDataTree.applyReplicatedPayload(cohort1.getIdentifier(), mockPayload);
345         shardDataTree.applyReplicatedPayload(cohort3.getIdentifier(), mockPayload);
346         shardDataTree.applyReplicatedPayload(cohort4.getIdentifier(), mockPayload);
347
348         final DataTreeSnapshot snapshot =
349                 shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
350         Optional<NormalizedNode<?, ?>> optional = snapshot.readNode(carPath);
351         assertEquals("Car node present", true, optional.isPresent());
352         assertEquals("Car node", carNode, optional.get());
353     }
354
355     @SuppressWarnings("unchecked")
356     @Test
357     public void testAbortWithFailedRebase() throws Exception {
358         final ShardDataTreeCohort cohort1 = newShardDataTreeCohort(snapshot ->
359             snapshot.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()));
360
361         final ShardDataTreeCohort cohort2 = newShardDataTreeCohort(snapshot ->
362             snapshot.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()));
363
364         NormalizedNode<?, ?> peopleNode = PeopleModel.create();
365         final ShardDataTreeCohort cohort3 = newShardDataTreeCohort(snapshot ->
366             snapshot.write(PeopleModel.BASE_PATH, peopleNode));
367
368         immediateCanCommit(cohort1);
369         FutureCallback<Void> canCommitCallback2 = coordinatedCanCommit(cohort2);
370
371         coordinatedPreCommit(cohort1);
372         verify(canCommitCallback2).onSuccess(null);
373
374         FutureCallback<Void> mockAbortCallback = mock(FutureCallback.class);
375         doNothing().when(mockAbortCallback).onSuccess(null);
376         cohort1.abort(mockAbortCallback);
377         verify(mockAbortCallback).onSuccess(null);
378
379         FutureCallback<DataTreeCandidate> preCommitCallback2 = coordinatedPreCommit(cohort2);
380         verify(preCommitCallback2).onFailure(any(Throwable.class));
381
382         immediateCanCommit(cohort3);
383         immediatePreCommit(cohort3);
384         immediateCommit(cohort3);
385
386         final DataTreeSnapshot snapshot =
387                 shardDataTree.newReadOnlyTransaction(nextTransactionId()).getSnapshot();
388         Optional<NormalizedNode<?, ?>> optional = snapshot.readNode(PeopleModel.BASE_PATH);
389         assertEquals("People node present", true, optional.isPresent());
390         assertEquals("People node", peopleNode, optional.get());
391     }
392
393     private ShardDataTreeCohort newShardDataTreeCohort(final DataTreeOperation operation) {
394         final ReadWriteShardDataTreeTransaction transaction =
395                 shardDataTree.newReadWriteTransaction(nextTransactionId());
396         final DataTreeModification snapshot = transaction.getSnapshot();
397         operation.execute(snapshot);
398         return shardDataTree.finishTransaction(transaction);
399     }
400
401     @SuppressWarnings({ "rawtypes", "unchecked" })
402     private static void verifyOnDataTreeChanged(final DOMDataTreeChangeListener listener,
403             final Consumer<DataTreeCandidate> callback) {
404         ArgumentCaptor<Collection> changes = ArgumentCaptor.forClass(Collection.class);
405         verify(listener, atLeastOnce()).onDataTreeChanged(changes.capture());
406         for (Collection list : changes.getAllValues()) {
407             for (Object dtc : list) {
408                 callback.accept((DataTreeCandidate)dtc);
409             }
410         }
411
412         reset(listener);
413     }
414
415     private static NormalizedNode<?, ?> getCars(final ShardDataTree shardDataTree) {
416         final ReadOnlyShardDataTreeTransaction readOnlyShardDataTreeTransaction =
417                 shardDataTree.newReadOnlyTransaction(nextTransactionId());
418         final DataTreeSnapshot snapshot1 = readOnlyShardDataTreeTransaction.getSnapshot();
419
420         final Optional<NormalizedNode<?, ?>> optional = snapshot1.readNode(CarsModel.BASE_PATH);
421
422         assertEquals(true, optional.isPresent());
423
424         return optional.get();
425     }
426
427     private static DataTreeCandidate addCar(final ShardDataTree shardDataTree)
428             throws ExecutionException, InterruptedException {
429         return addCar(shardDataTree, "altima");
430     }
431
432     private static DataTreeCandidate addCar(final ShardDataTree shardDataTree, final String name)
433             throws ExecutionException, InterruptedException {
434         return doTransaction(shardDataTree, snapshot -> {
435             snapshot.merge(CarsModel.BASE_PATH, CarsModel.emptyContainer());
436             snapshot.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
437             snapshot.write(CarsModel.newCarPath(name), CarsModel.newCarEntry(name, new BigInteger("100")));
438         });
439     }
440
441     private static DataTreeCandidate removeCar(final ShardDataTree shardDataTree)
442             throws ExecutionException, InterruptedException {
443         return doTransaction(shardDataTree, snapshot -> snapshot.delete(CarsModel.newCarPath("altima")));
444     }
445
446     @FunctionalInterface
447     private interface DataTreeOperation {
448         void execute(DataTreeModification snapshot);
449     }
450
451     private static DataTreeCandidate doTransaction(final ShardDataTree shardDataTree,
452             final DataTreeOperation operation) throws ExecutionException, InterruptedException {
453         final ReadWriteShardDataTreeTransaction transaction =
454                 shardDataTree.newReadWriteTransaction(nextTransactionId());
455         final DataTreeModification snapshot = transaction.getSnapshot();
456         operation.execute(snapshot);
457         final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction);
458
459         immediateCanCommit(cohort);
460         immediatePreCommit(cohort);
461         final DataTreeCandidate candidate = cohort.getCandidate();
462         immediateCommit(cohort);
463
464         return candidate;
465     }
466
467     private static DataTreeCandidate applyCandidates(final ShardDataTree shardDataTree,
468             final List<DataTreeCandidate> candidates) throws ExecutionException, InterruptedException {
469         final ReadWriteShardDataTreeTransaction transaction =
470                 shardDataTree.newReadWriteTransaction(nextTransactionId());
471         final DataTreeModification snapshot = transaction.getSnapshot();
472         for (final DataTreeCandidate candidateTip : candidates) {
473             DataTreeCandidates.applyToModification(snapshot, candidateTip);
474         }
475         final ShardDataTreeCohort cohort = shardDataTree.finishTransaction(transaction);
476
477         immediateCanCommit(cohort);
478         immediatePreCommit(cohort);
479         final DataTreeCandidate candidate = cohort.getCandidate();
480         immediateCommit(cohort);
481
482         return candidate;
483     }
484 }