b1587bbcfb476f3bfdf9ae0de3cc19980b4f4083
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ConcurrentDOMDataBrokerTest.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertSame;
13 import static org.junit.Assert.assertTrue;
14 import static org.junit.Assert.fail;
15 import static org.mockito.Mockito.doAnswer;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.inOrder;
18 import static org.mockito.Mockito.mock;
19 import static org.mockito.Mockito.never;
20 import static org.mockito.Mockito.times;
21 import static org.mockito.Mockito.verify;
22 import com.google.common.collect.ImmutableMap;
23 import com.google.common.util.concurrent.CheckedFuture;
24 import com.google.common.util.concurrent.FutureCallback;
25 import com.google.common.util.concurrent.Futures;
26 import com.google.common.util.concurrent.ListenableFuture;
27 import com.google.common.util.concurrent.MoreExecutors;
28 import com.google.common.util.concurrent.SettableFuture;
29 import com.google.common.util.concurrent.Uninterruptibles;
30 import java.util.ArrayList;
31 import java.util.Arrays;
32 import java.util.Collection;
33 import java.util.List;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.ExecutionException;
36 import java.util.concurrent.SynchronousQueue;
37 import java.util.concurrent.ThreadPoolExecutor;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.TimeoutException;
40 import java.util.concurrent.atomic.AtomicReference;
41 import org.junit.After;
42 import org.junit.Before;
43 import org.junit.Test;
44 import org.mockito.InOrder;
45 import org.mockito.invocation.InvocationOnMock;
46 import org.mockito.stubbing.Answer;
47 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
48 import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException;
49 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
50 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
51 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
52 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
53 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
54 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
55 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
56 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
57 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
58 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
59 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
60 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
61 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
62 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
63 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
64 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
65
66 /**
67  * Unit tests for DOMConcurrentDataCommitCoordinator.
68  *
69  * @author Thomas Pantelis
70  */
71 public class ConcurrentDOMDataBrokerTest {
72
73     private final DOMDataWriteTransaction transaction = mock(DOMDataWriteTransaction.class);
74     private final DOMStoreThreePhaseCommitCohort mockCohort1 = mock(DOMStoreThreePhaseCommitCohort.class);
75     private final DOMStoreThreePhaseCommitCohort mockCohort2 = mock(DOMStoreThreePhaseCommitCohort.class);
76     private final ThreadPoolExecutor futureExecutor =
77             new ThreadPoolExecutor(0, 1, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
78     private ConcurrentDOMDataBroker coordinator;
79
80     @Before
81     public void setup() {
82         doReturn("tx").when(transaction).getIdentifier();
83
84         DOMStore store = new InMemoryDOMDataStore("OPER",
85             MoreExecutors.sameThreadExecutor());
86
87         coordinator = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL, store), futureExecutor);
88     }
89
90     @After
91     public void tearDown() {
92         futureExecutor.shutdownNow();
93     }
94
95     @Test
96     public void testSuccessfulSubmitAsync() throws Throwable {
97         testSuccessfulSubmit(true);
98     }
99
100     @Test
101     public void testSuccessfulSubmitSync() throws Throwable {
102         testSuccessfulSubmit(false);
103     }
104
105     private void testSuccessfulSubmit(final boolean doAsync) throws Throwable {
106         final CountDownLatch asyncCanCommitContinue = new CountDownLatch(1);
107         Answer<ListenableFuture<Boolean>> asyncCanCommit = new Answer<ListenableFuture<Boolean>>() {
108             @Override
109             public ListenableFuture<Boolean> answer(final InvocationOnMock invocation) {
110                 final SettableFuture<Boolean> future = SettableFuture.create();
111                 if(doAsync) {
112                     new Thread() {
113                         @Override
114                         public void run() {
115                             Uninterruptibles.awaitUninterruptibly(asyncCanCommitContinue,
116                                     10, TimeUnit.SECONDS);
117                             future.set(true);
118                         }
119                     }.start();
120                 } else {
121                     future.set(true);
122                 }
123
124                 return future;
125             }
126         };
127
128         doAnswer(asyncCanCommit).when(mockCohort1).canCommit();
129         doReturn(Futures.immediateFuture(null)).when(mockCohort1).preCommit();
130         doReturn(Futures.immediateFuture(null)).when(mockCohort1).commit();
131
132         doReturn(Futures.immediateFuture(true)).when(mockCohort2).canCommit();
133         doReturn(Futures.immediateFuture(null)).when(mockCohort2).preCommit();
134         doReturn(Futures.immediateFuture(null)).when(mockCohort2).commit();
135
136         CheckedFuture<Void, TransactionCommitFailedException> future = coordinator.submit(
137                 transaction, Arrays.asList(mockCohort1, mockCohort2));
138
139         final CountDownLatch doneLatch = new CountDownLatch(1);
140         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
141         Futures.addCallback(future, new FutureCallback<Void>() {
142             @Override
143             public void onSuccess(final Void result) {
144                 doneLatch.countDown();
145             }
146
147             @Override
148             public void onFailure(final Throwable t) {
149                 caughtEx.set(t);
150                 doneLatch.countDown();
151             }
152         });
153
154         asyncCanCommitContinue.countDown();
155
156         assertEquals("Submit complete", true, doneLatch.await(5, TimeUnit.SECONDS));
157
158         if(caughtEx.get() != null) {
159             throw caughtEx.get();
160         }
161
162         assertEquals("Task count", doAsync ? 1 : 0, futureExecutor.getTaskCount());
163
164         InOrder inOrder = inOrder(mockCohort1, mockCohort2);
165         inOrder.verify(mockCohort1).canCommit();
166         inOrder.verify(mockCohort2).canCommit();
167         inOrder.verify(mockCohort1).preCommit();
168         inOrder.verify(mockCohort2).preCommit();
169         inOrder.verify(mockCohort1).commit();
170         inOrder.verify(mockCohort2).commit();
171     }
172
173     @Test
174     public void testSubmitWithNegativeCanCommitResponse() throws Exception {
175         doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit();
176         doReturn(Futures.immediateFuture(null)).when(mockCohort1).abort();
177
178         doReturn(Futures.immediateFuture(false)).when(mockCohort2).canCommit();
179         doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort();
180
181         DOMStoreThreePhaseCommitCohort mockCohort3 = mock(DOMStoreThreePhaseCommitCohort.class);
182         doReturn(Futures.immediateFuture(false)).when(mockCohort3).canCommit();
183         doReturn(Futures.immediateFuture(null)).when(mockCohort3).abort();
184
185         CheckedFuture<Void, TransactionCommitFailedException> future = coordinator.submit(
186                 transaction, Arrays.asList(mockCohort1, mockCohort2, mockCohort3));
187
188         assertFailure(future, null, mockCohort1, mockCohort2, mockCohort3);
189     }
190
191     private static void assertFailure(final CheckedFuture<Void, TransactionCommitFailedException> future,
192             final Exception expCause, final DOMStoreThreePhaseCommitCohort... mockCohorts)
193                     throws Exception {
194         try {
195             future.checkedGet(5, TimeUnit.SECONDS);
196             fail("Expected TransactionCommitFailedException");
197         } catch (TransactionCommitFailedException e) {
198             if(expCause != null) {
199                 assertSame("Expected cause", expCause.getClass(), e.getCause().getClass());
200             }
201
202             InOrder inOrder = inOrder((Object[])mockCohorts);
203             for(DOMStoreThreePhaseCommitCohort c: mockCohorts) {
204                 inOrder.verify(c).abort();
205             }
206         } catch (TimeoutException e) {
207             throw e;
208         }
209     }
210
211     @Test
212     public void testSubmitWithCanCommitException() throws Exception {
213         doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit();
214         doReturn(Futures.immediateFuture(null)).when(mockCohort1).abort();
215
216         IllegalStateException cause = new IllegalStateException("mock");
217         doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).canCommit();
218         doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort();
219
220         CheckedFuture<Void, TransactionCommitFailedException> future = coordinator.submit(
221                 transaction, Arrays.asList(mockCohort1, mockCohort2));
222
223         assertFailure(future, cause, mockCohort1, mockCohort2);
224     }
225
226     @Test
227     public void testSubmitWithCanCommitDataStoreUnavailableException() throws Exception {
228         doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit();
229         doReturn(Futures.immediateFuture(null)).when(mockCohort1).abort();
230         NoShardLeaderException rootCause = new NoShardLeaderException("mock");
231         DataStoreUnavailableException cause = new DataStoreUnavailableException(rootCause.getMessage(), rootCause);
232         doReturn(Futures.immediateFailedFuture(rootCause)).when(mockCohort2).canCommit();
233         doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort();
234
235         CheckedFuture<Void, TransactionCommitFailedException> future = coordinator.submit(
236             transaction, Arrays.asList(mockCohort1, mockCohort2));
237
238         assertFailure(future, cause, mockCohort1, mockCohort2);
239     }
240
241     @Test
242     public void testSubmitWithPreCommitException() throws Exception {
243         doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit();
244         doReturn(Futures.immediateFuture(null)).when(mockCohort1).preCommit();
245         doReturn(Futures.immediateFuture(null)).when(mockCohort1).abort();
246
247         doReturn(Futures.immediateFuture(true)).when(mockCohort2).canCommit();
248         IllegalStateException cause = new IllegalStateException("mock");
249         doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).preCommit();
250         doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort();
251
252         DOMStoreThreePhaseCommitCohort mockCohort3 = mock(DOMStoreThreePhaseCommitCohort.class);
253         doReturn(Futures.immediateFuture(true)).when(mockCohort3).canCommit();
254         doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock2"))).
255                 when(mockCohort3).preCommit();
256         doReturn(Futures.immediateFuture(null)).when(mockCohort3).abort();
257
258         CheckedFuture<Void, TransactionCommitFailedException> future = coordinator.submit(
259                 transaction, Arrays.asList(mockCohort1, mockCohort2, mockCohort3));
260
261         assertFailure(future, cause, mockCohort1, mockCohort2, mockCohort3);
262     }
263
264     @Test
265     public void testSubmitWithCommitException() throws Exception {
266         doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit();
267         doReturn(Futures.immediateFuture(null)).when(mockCohort1).preCommit();
268         doReturn(Futures.immediateFuture(null)).when(mockCohort1).commit();
269         doReturn(Futures.immediateFuture(null)).when(mockCohort1).abort();
270
271         doReturn(Futures.immediateFuture(true)).when(mockCohort2).canCommit();
272         doReturn(Futures.immediateFuture(null)).when(mockCohort2).preCommit();
273         IllegalStateException cause = new IllegalStateException("mock");
274         doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).commit();
275         doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort();
276
277         DOMStoreThreePhaseCommitCohort mockCohort3 = mock(DOMStoreThreePhaseCommitCohort.class);
278         doReturn(Futures.immediateFuture(true)).when(mockCohort3).canCommit();
279         doReturn(Futures.immediateFuture(null)).when(mockCohort3).preCommit();
280         doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock2"))).
281                 when(mockCohort3).commit();
282         doReturn(Futures.immediateFuture(null)).when(mockCohort3).abort();
283
284         CheckedFuture<Void, TransactionCommitFailedException> future = coordinator.submit(
285                 transaction, Arrays.asList(mockCohort1, mockCohort2, mockCohort3));
286
287         assertFailure(future, cause, mockCohort1, mockCohort2, mockCohort3);
288     }
289
290     @Test
291     public void testSubmitWithAbortException() throws Exception {
292         doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit();
293         doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock abort error"))).
294                 when(mockCohort1).abort();
295
296         IllegalStateException cause = new IllegalStateException("mock canCommit error");
297         doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).canCommit();
298         doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort();
299
300         CheckedFuture<Void, TransactionCommitFailedException> future = coordinator.submit(
301                 transaction, Arrays.asList(mockCohort1, mockCohort2));
302
303         assertFailure(future, cause, mockCohort1, mockCohort2);
304     }
305
306     @Test
307     public void testCreateReadWriteTransaction(){
308         DOMStore domStore = mock(DOMStore.class);
309         try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
310                 LogicalDatastoreType.OPERATIONAL, domStore, LogicalDatastoreType.CONFIGURATION, domStore),
311                 futureExecutor)) {
312             dataBroker.newReadWriteTransaction();
313
314             verify(domStore, never()).newReadWriteTransaction();
315         }
316     }
317
318     @Test
319     public void testCreateWriteOnlyTransaction(){
320         DOMStore domStore = mock(DOMStore.class);
321         try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
322                 LogicalDatastoreType.OPERATIONAL, domStore, LogicalDatastoreType.CONFIGURATION, domStore),
323                 futureExecutor)) {
324             dataBroker.newWriteOnlyTransaction();
325
326             verify(domStore, never()).newWriteOnlyTransaction();
327         }
328     }
329
330     @Test
331     public void testCreateReadOnlyTransaction(){
332         DOMStore domStore = mock(DOMStore.class);
333         try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
334                 LogicalDatastoreType.OPERATIONAL, domStore, LogicalDatastoreType.CONFIGURATION, domStore),
335                 futureExecutor)) {
336             dataBroker.newReadOnlyTransaction();
337
338             verify(domStore, never()).newReadOnlyTransaction();
339         }
340     }
341
342     @Test
343     public void testLazySubTransactionCreationForReadWriteTransactions(){
344         DOMStore configDomStore = mock(DOMStore.class);
345         DOMStore operationalDomStore = mock(DOMStore.class);
346         DOMStoreReadWriteTransaction storeTxn = mock(DOMStoreReadWriteTransaction.class);
347
348         doReturn(storeTxn).when(operationalDomStore).newReadWriteTransaction();
349         doReturn(storeTxn).when(configDomStore).newReadWriteTransaction();
350
351         try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
352                 LogicalDatastoreType.OPERATIONAL, operationalDomStore, LogicalDatastoreType.CONFIGURATION,
353                 configDomStore), futureExecutor)) {
354             DOMDataReadWriteTransaction dataTxn = dataBroker.newReadWriteTransaction();
355
356             dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
357             dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
358             dataTxn.read(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build());
359
360             verify(configDomStore, never()).newReadWriteTransaction();
361             verify(operationalDomStore, times(1)).newReadWriteTransaction();
362
363             dataTxn.put(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
364
365             verify(configDomStore, times(1)).newReadWriteTransaction();
366             verify(operationalDomStore, times(1)).newReadWriteTransaction();
367         }
368
369     }
370
371     @Test
372     public void testLazySubTransactionCreationForWriteOnlyTransactions(){
373         DOMStore configDomStore = mock(DOMStore.class);
374         DOMStore operationalDomStore = mock(DOMStore.class);
375         DOMStoreWriteTransaction storeTxn = mock(DOMStoreWriteTransaction.class);
376
377         doReturn(storeTxn).when(operationalDomStore).newWriteOnlyTransaction();
378         doReturn(storeTxn).when(configDomStore).newWriteOnlyTransaction();
379
380         try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
381                 LogicalDatastoreType.OPERATIONAL, operationalDomStore, LogicalDatastoreType.CONFIGURATION,
382                 configDomStore), futureExecutor)) {
383             DOMDataWriteTransaction dataTxn = dataBroker.newWriteOnlyTransaction();
384
385             dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
386             dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
387
388             verify(configDomStore, never()).newWriteOnlyTransaction();
389             verify(operationalDomStore, times(1)).newWriteOnlyTransaction();
390
391             dataTxn.put(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
392
393             verify(configDomStore, times(1)).newWriteOnlyTransaction();
394             verify(operationalDomStore, times(1)).newWriteOnlyTransaction();
395         }
396     }
397
398     @Test
399     public void testLazySubTransactionCreationForReadOnlyTransactions(){
400         DOMStore configDomStore = mock(DOMStore.class);
401         DOMStore operationalDomStore = mock(DOMStore.class);
402         DOMStoreReadTransaction storeTxn = mock(DOMStoreReadTransaction.class);
403
404         doReturn(storeTxn).when(operationalDomStore).newReadOnlyTransaction();
405         doReturn(storeTxn).when(configDomStore).newReadOnlyTransaction();
406
407         try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
408                 LogicalDatastoreType.OPERATIONAL, operationalDomStore, LogicalDatastoreType.CONFIGURATION,
409                 configDomStore), futureExecutor)) {
410             DOMDataReadOnlyTransaction dataTxn = dataBroker.newReadOnlyTransaction();
411
412             dataTxn.read(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build());
413             dataTxn.read(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build());
414
415             verify(configDomStore, never()).newReadOnlyTransaction();
416             verify(operationalDomStore, times(1)).newReadOnlyTransaction();
417
418             dataTxn.read(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.builder().build());
419
420             verify(configDomStore, times(1)).newReadOnlyTransaction();
421             verify(operationalDomStore, times(1)).newReadOnlyTransaction();
422         }
423     }
424
425     @Test
426     public void testSubmitWithOnlyOneSubTransaction() throws InterruptedException {
427         DOMStore configDomStore = mock(DOMStore.class);
428         DOMStore operationalDomStore = mock(DOMStore.class);
429         DOMStoreReadWriteTransaction mockStoreReadWriteTransaction = mock(DOMStoreReadWriteTransaction.class);
430         DOMStoreThreePhaseCommitCohort mockCohort = mock(DOMStoreThreePhaseCommitCohort.class);
431
432         doReturn(mockStoreReadWriteTransaction).when(operationalDomStore).newReadWriteTransaction();
433         doReturn(mockCohort).when(mockStoreReadWriteTransaction).ready();
434         doReturn(Futures.immediateFuture(false)).when(mockCohort).canCommit();
435         doReturn(Futures.immediateFuture(null)).when(mockCohort).abort();
436
437         final CountDownLatch latch = new CountDownLatch(1);
438         final List<DOMStoreThreePhaseCommitCohort> commitCohorts = new ArrayList<>();
439
440         try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
441                 LogicalDatastoreType.OPERATIONAL, operationalDomStore, LogicalDatastoreType.CONFIGURATION,
442                 configDomStore), futureExecutor) {
443             @Override
444             public CheckedFuture<Void, TransactionCommitFailedException> submit(DOMDataWriteTransaction transaction, Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
445                 commitCohorts.addAll(cohorts);
446                 latch.countDown();
447                 return super.submit(transaction, cohorts);
448             }
449         }) {
450             DOMDataReadWriteTransaction domDataReadWriteTransaction = dataBroker.newReadWriteTransaction();
451
452             domDataReadWriteTransaction.delete(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build());
453
454             domDataReadWriteTransaction.submit();
455
456             latch.await(10, TimeUnit.SECONDS);
457
458             assertTrue(commitCohorts.size() == 1);
459         }
460     }
461
462     @Test
463     public void testSubmitWithOnlyTwoSubTransactions() throws InterruptedException {
464         DOMStore configDomStore = mock(DOMStore.class);
465         DOMStore operationalDomStore = mock(DOMStore.class);
466         DOMStoreReadWriteTransaction operationalTransaction = mock(DOMStoreReadWriteTransaction.class);
467         DOMStoreReadWriteTransaction configTransaction = mock(DOMStoreReadWriteTransaction.class);
468         DOMStoreThreePhaseCommitCohort mockCohortOperational = mock(DOMStoreThreePhaseCommitCohort.class);
469         DOMStoreThreePhaseCommitCohort mockCohortConfig = mock(DOMStoreThreePhaseCommitCohort.class);
470
471         doReturn(operationalTransaction).when(operationalDomStore).newReadWriteTransaction();
472         doReturn(configTransaction).when(configDomStore).newReadWriteTransaction();
473
474         doReturn(mockCohortOperational).when(operationalTransaction).ready();
475         doReturn(Futures.immediateFuture(false)).when(mockCohortOperational).canCommit();
476         doReturn(Futures.immediateFuture(null)).when(mockCohortOperational).abort();
477
478         doReturn(mockCohortConfig).when(configTransaction).ready();
479         doReturn(Futures.immediateFuture(false)).when(mockCohortConfig).canCommit();
480         doReturn(Futures.immediateFuture(null)).when(mockCohortConfig).abort();
481
482         final CountDownLatch latch = new CountDownLatch(1);
483         final List<DOMStoreThreePhaseCommitCohort> commitCohorts = new ArrayList<>();
484
485         try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
486                 LogicalDatastoreType.OPERATIONAL, operationalDomStore, LogicalDatastoreType.CONFIGURATION,
487                 configDomStore), futureExecutor) {
488             @Override
489             public CheckedFuture<Void, TransactionCommitFailedException> submit(DOMDataWriteTransaction transaction, Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
490                 commitCohorts.addAll(cohorts);
491                 latch.countDown();
492                 return super.submit(transaction, cohorts);
493             }
494         }) {
495             DOMDataReadWriteTransaction domDataReadWriteTransaction = dataBroker.newReadWriteTransaction();
496
497             domDataReadWriteTransaction.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
498             domDataReadWriteTransaction.merge(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
499
500             domDataReadWriteTransaction.submit();
501
502             latch.await(10, TimeUnit.SECONDS);
503
504             assertTrue(commitCohorts.size() == 2);
505         }
506     }
507
508     @Test
509     public void testCreateTransactionChain(){
510         DOMStore domStore = mock(DOMStore.class);
511         try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
512                 LogicalDatastoreType.OPERATIONAL, domStore, LogicalDatastoreType.CONFIGURATION, domStore),
513                 futureExecutor)) {
514
515             dataBroker.createTransactionChain(mock(TransactionChainListener.class));
516
517             verify(domStore, times(2)).createTransactionChain();
518         }
519
520     }
521
522     @Test
523     public void testCreateTransactionOnChain(){
524         DOMStore domStore = mock(DOMStore.class);
525         try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
526                 LogicalDatastoreType.OPERATIONAL, domStore, LogicalDatastoreType.CONFIGURATION, domStore),
527                 futureExecutor)) {
528
529             DOMStoreReadWriteTransaction operationalTransaction = mock(DOMStoreReadWriteTransaction.class);
530             DOMStoreTransactionChain mockChain = mock(DOMStoreTransactionChain.class);
531
532             doReturn(mockChain).when(domStore).createTransactionChain();
533             doReturn(operationalTransaction).when(mockChain).newWriteOnlyTransaction();
534
535             DOMTransactionChain transactionChain = dataBroker.createTransactionChain(mock(TransactionChainListener.class));
536
537             DOMDataWriteTransaction domDataWriteTransaction = transactionChain.newWriteOnlyTransaction();
538
539             verify(mockChain, never()).newWriteOnlyTransaction();
540
541             domDataWriteTransaction.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
542         }
543     }
544
545     @Test
546     public void testEmptyTransactionSubmitSucceeds() throws ExecutionException, InterruptedException {
547         DOMStore domStore = mock(DOMStore.class);
548         try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
549                 LogicalDatastoreType.OPERATIONAL, domStore, LogicalDatastoreType.CONFIGURATION, domStore),
550                 futureExecutor)) {
551
552             CheckedFuture<Void, TransactionCommitFailedException> submit1 = dataBroker.newWriteOnlyTransaction().submit();
553
554             assertNotNull(submit1);
555
556             submit1.get();
557
558             CheckedFuture<Void, TransactionCommitFailedException> submit2 = dataBroker.newReadWriteTransaction().submit();
559
560             assertNotNull(submit2);
561
562             submit2.get();
563         }
564     }
565
566 }