Convert mdsal submit() calls to commit()
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / databroker / ConcurrentDOMDataBrokerTest.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertSame;
13 import static org.junit.Assert.assertTrue;
14 import static org.junit.Assert.fail;
15 import static org.mockito.Mockito.doAnswer;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.inOrder;
18 import static org.mockito.Mockito.mock;
19 import static org.mockito.Mockito.never;
20 import static org.mockito.Mockito.times;
21 import static org.mockito.Mockito.verify;
22
23 import com.google.common.base.Throwables;
24 import com.google.common.collect.ImmutableMap;
25 import com.google.common.util.concurrent.FluentFuture;
26 import com.google.common.util.concurrent.FutureCallback;
27 import com.google.common.util.concurrent.Futures;
28 import com.google.common.util.concurrent.ListenableFuture;
29 import com.google.common.util.concurrent.MoreExecutors;
30 import com.google.common.util.concurrent.SettableFuture;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import java.util.ArrayList;
33 import java.util.Arrays;
34 import java.util.Collection;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.SynchronousQueue;
40 import java.util.concurrent.ThreadPoolExecutor;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.TimeoutException;
43 import java.util.concurrent.atomic.AtomicReference;
44 import org.junit.After;
45 import org.junit.Before;
46 import org.junit.Test;
47 import org.mockito.InOrder;
48 import org.mockito.stubbing.Answer;
49 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
50 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
51 import org.opendaylight.mdsal.common.api.CommitInfo;
52 import org.opendaylight.mdsal.common.api.DataStoreUnavailableException;
53 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
54 import org.opendaylight.mdsal.common.api.TransactionChainListener;
55 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
56 import org.opendaylight.mdsal.dom.api.DOMDataBrokerExtension;
57 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
58 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
59 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistry;
60 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
61 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
62 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
63 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
64 import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
65 import org.opendaylight.mdsal.dom.broker.TransactionCommitFailedExceptionMapper;
66 import org.opendaylight.mdsal.dom.spi.store.DOMStore;
67 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
68 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
69 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
70 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
71 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
72 import org.opendaylight.mdsal.dom.store.inmemory.InMemoryDOMDataStore;
73 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
74 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
75
76 /**
77  * Unit tests for DOMConcurrentDataCommitCoordinator.
78  *
79  * @author Thomas Pantelis
80  */
81 public class ConcurrentDOMDataBrokerTest {
82
83     private final DOMDataTreeWriteTransaction transaction = mock(DOMDataTreeWriteTransaction.class);
84     private final DOMStoreThreePhaseCommitCohort mockCohort1 = mock(DOMStoreThreePhaseCommitCohort.class);
85     private final DOMStoreThreePhaseCommitCohort mockCohort2 = mock(DOMStoreThreePhaseCommitCohort.class);
86     private final ThreadPoolExecutor futureExecutor =
87             new ThreadPoolExecutor(0, 1, 5, TimeUnit.SECONDS, new SynchronousQueue<>());
88     private ConcurrentDOMDataBroker coordinator;
89
90     @Before
91     public void setup() {
92         doReturn("tx").when(transaction).getIdentifier();
93
94         DOMStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.newDirectExecutorService());
95
96         coordinator = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL, store),
97                 futureExecutor);
98     }
99
100     @After
101     public void tearDown() {
102         futureExecutor.shutdownNow();
103     }
104
105     @Test
106     public void testSuccessfulSubmitAsync() throws Exception {
107         testSuccessfulSubmit(true);
108     }
109
110     @Test
111     public void testSuccessfulSubmitSync() throws Exception {
112         testSuccessfulSubmit(false);
113     }
114
115     private void testSuccessfulSubmit(final boolean doAsync) throws InterruptedException {
116         final CountDownLatch asyncCanCommitContinue = new CountDownLatch(1);
117         Answer<ListenableFuture<Boolean>> asyncCanCommit = invocation -> {
118             final SettableFuture<Boolean> future = SettableFuture.create();
119             if (doAsync) {
120                 new Thread(() -> {
121                     Uninterruptibles.awaitUninterruptibly(asyncCanCommitContinue,
122                             10, TimeUnit.SECONDS);
123                     future.set(true);
124                 }).start();
125             } else {
126                 future.set(true);
127             }
128
129             return future;
130         };
131
132         doAnswer(asyncCanCommit).when(mockCohort1).canCommit();
133         doReturn(Futures.immediateFuture(null)).when(mockCohort1).preCommit();
134         doReturn(Futures.immediateFuture(null)).when(mockCohort1).commit();
135
136         doReturn(Futures.immediateFuture(true)).when(mockCohort2).canCommit();
137         doReturn(Futures.immediateFuture(null)).when(mockCohort2).preCommit();
138         doReturn(Futures.immediateFuture(null)).when(mockCohort2).commit();
139
140         ListenableFuture<? extends CommitInfo> future =
141                 coordinator.commit(transaction, Arrays.asList(mockCohort1, mockCohort2));
142
143         final CountDownLatch doneLatch = new CountDownLatch(1);
144         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
145         Futures.addCallback(future, new FutureCallback<CommitInfo>() {
146             @Override
147             public void onSuccess(final CommitInfo result) {
148                 doneLatch.countDown();
149             }
150
151             @Override
152             public void onFailure(final Throwable failure) {
153                 caughtEx.set(failure);
154                 doneLatch.countDown();
155             }
156         }, MoreExecutors.directExecutor());
157
158         asyncCanCommitContinue.countDown();
159
160         assertEquals("Submit complete", true, doneLatch.await(5, TimeUnit.SECONDS));
161
162         if (caughtEx.get() != null) {
163             Throwables.throwIfUnchecked(caughtEx.get());
164             throw new RuntimeException(caughtEx.get());
165         }
166
167         assertEquals("Task count", doAsync ? 1 : 0, futureExecutor.getTaskCount());
168
169         InOrder inOrder = inOrder(mockCohort1, mockCohort2);
170         inOrder.verify(mockCohort1).canCommit();
171         inOrder.verify(mockCohort2).canCommit();
172         inOrder.verify(mockCohort1).preCommit();
173         inOrder.verify(mockCohort2).preCommit();
174         inOrder.verify(mockCohort1).commit();
175         inOrder.verify(mockCohort2).commit();
176     }
177
178     @Test
179     public void testSubmitWithNegativeCanCommitResponse() throws Exception {
180         doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit();
181         doReturn(Futures.immediateFuture(null)).when(mockCohort1).abort();
182
183         doReturn(Futures.immediateFuture(false)).when(mockCohort2).canCommit();
184         doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort();
185
186         DOMStoreThreePhaseCommitCohort mockCohort3 = mock(DOMStoreThreePhaseCommitCohort.class);
187         doReturn(Futures.immediateFuture(false)).when(mockCohort3).canCommit();
188         doReturn(Futures.immediateFuture(null)).when(mockCohort3).abort();
189
190         ListenableFuture<? extends CommitInfo> future = coordinator.commit(
191                 transaction, Arrays.asList(mockCohort1, mockCohort2, mockCohort3));
192
193         assertFailure(future, null, mockCohort1, mockCohort2, mockCohort3);
194     }
195
196     private static void assertFailure(final ListenableFuture<?> future,
197             final Exception expCause, final DOMStoreThreePhaseCommitCohort... mockCohorts)
198                     throws Exception {
199         try {
200             future.get(5, TimeUnit.SECONDS);
201             fail("Expected TransactionCommitFailedException");
202         } catch (ExecutionException e) {
203             TransactionCommitFailedException tcf = TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e);
204             if (expCause != null) {
205                 assertSame("Expected cause", expCause.getClass(), tcf.getCause().getClass());
206             }
207
208             InOrder inOrder = inOrder((Object[])mockCohorts);
209             for (DOMStoreThreePhaseCommitCohort c: mockCohorts) {
210                 inOrder.verify(c).abort();
211             }
212         } catch (TimeoutException e) {
213             throw e;
214         }
215     }
216
217     @Test
218     public void testSubmitWithCanCommitException() throws Exception {
219         doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit();
220         doReturn(Futures.immediateFuture(null)).when(mockCohort1).abort();
221
222         IllegalStateException cause = new IllegalStateException("mock");
223         doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).canCommit();
224         doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort();
225
226         FluentFuture<? extends CommitInfo> future = coordinator.commit(
227                 transaction, Arrays.asList(mockCohort1, mockCohort2));
228
229         assertFailure(future, cause, mockCohort1, mockCohort2);
230     }
231
232     @Test
233     public void testSubmitWithCanCommitDataStoreUnavailableException() throws Exception {
234         doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit();
235         doReturn(Futures.immediateFuture(null)).when(mockCohort1).abort();
236         NoShardLeaderException rootCause = new NoShardLeaderException("mock");
237         DataStoreUnavailableException cause = new DataStoreUnavailableException(rootCause.getMessage(), rootCause);
238         doReturn(Futures.immediateFailedFuture(rootCause)).when(mockCohort2).canCommit();
239         doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort();
240
241         FluentFuture<? extends CommitInfo> future = coordinator.commit(
242             transaction, Arrays.asList(mockCohort1, mockCohort2));
243
244         assertFailure(future, cause, mockCohort1, mockCohort2);
245     }
246
247     @Test
248     public void testSubmitWithPreCommitException() throws Exception {
249         doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit();
250         doReturn(Futures.immediateFuture(null)).when(mockCohort1).preCommit();
251         doReturn(Futures.immediateFuture(null)).when(mockCohort1).abort();
252
253         doReturn(Futures.immediateFuture(true)).when(mockCohort2).canCommit();
254         IllegalStateException cause = new IllegalStateException("mock");
255         doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).preCommit();
256         doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort();
257
258         DOMStoreThreePhaseCommitCohort mockCohort3 = mock(DOMStoreThreePhaseCommitCohort.class);
259         doReturn(Futures.immediateFuture(true)).when(mockCohort3).canCommit();
260         doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock2")))
261                 .when(mockCohort3).preCommit();
262         doReturn(Futures.immediateFuture(null)).when(mockCohort3).abort();
263
264         FluentFuture<? extends CommitInfo> future = coordinator.commit(
265                 transaction, Arrays.asList(mockCohort1, mockCohort2, mockCohort3));
266
267         assertFailure(future, cause, mockCohort1, mockCohort2, mockCohort3);
268     }
269
270     @Test
271     public void testSubmitWithCommitException() throws Exception {
272         doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit();
273         doReturn(Futures.immediateFuture(null)).when(mockCohort1).preCommit();
274         doReturn(Futures.immediateFuture(null)).when(mockCohort1).commit();
275         doReturn(Futures.immediateFuture(null)).when(mockCohort1).abort();
276
277         doReturn(Futures.immediateFuture(true)).when(mockCohort2).canCommit();
278         doReturn(Futures.immediateFuture(null)).when(mockCohort2).preCommit();
279         IllegalStateException cause = new IllegalStateException("mock");
280         doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).commit();
281         doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort();
282
283         DOMStoreThreePhaseCommitCohort mockCohort3 = mock(DOMStoreThreePhaseCommitCohort.class);
284         doReturn(Futures.immediateFuture(true)).when(mockCohort3).canCommit();
285         doReturn(Futures.immediateFuture(null)).when(mockCohort3).preCommit();
286         doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock2")))
287                 .when(mockCohort3).commit();
288         doReturn(Futures.immediateFuture(null)).when(mockCohort3).abort();
289
290         FluentFuture<? extends CommitInfo> future = coordinator.commit(
291                 transaction, Arrays.asList(mockCohort1, mockCohort2, mockCohort3));
292
293         assertFailure(future, cause, mockCohort1, mockCohort2, mockCohort3);
294     }
295
296     @Test
297     public void testSubmitWithAbortException() throws Exception {
298         doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit();
299         doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock abort error")))
300                 .when(mockCohort1).abort();
301
302         IllegalStateException cause = new IllegalStateException("mock canCommit error");
303         doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).canCommit();
304         doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort();
305
306         FluentFuture<? extends CommitInfo> future = coordinator.commit(
307                 transaction, Arrays.asList(mockCohort1, mockCohort2));
308
309         assertFailure(future, cause, mockCohort1, mockCohort2);
310     }
311
312     @Test
313     public void testCreateReadWriteTransaction() {
314         DOMStore domStore = mock(DOMStore.class);
315         try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
316                 LogicalDatastoreType.OPERATIONAL, domStore, LogicalDatastoreType.CONFIGURATION, domStore),
317                 futureExecutor)) {
318             dataBroker.newReadWriteTransaction();
319
320             verify(domStore, never()).newReadWriteTransaction();
321         }
322     }
323
324     @Test
325     public void testCreateWriteOnlyTransaction() {
326         DOMStore domStore = mock(DOMStore.class);
327         try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
328                 LogicalDatastoreType.OPERATIONAL, domStore, LogicalDatastoreType.CONFIGURATION, domStore),
329                 futureExecutor)) {
330             dataBroker.newWriteOnlyTransaction();
331
332             verify(domStore, never()).newWriteOnlyTransaction();
333         }
334     }
335
336     @Test
337     public void testCreateReadOnlyTransaction() {
338         DOMStore domStore = mock(DOMStore.class);
339         try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
340                 LogicalDatastoreType.OPERATIONAL, domStore, LogicalDatastoreType.CONFIGURATION, domStore),
341                 futureExecutor)) {
342             dataBroker.newReadOnlyTransaction();
343
344             verify(domStore, never()).newReadOnlyTransaction();
345         }
346     }
347
348     @Test
349     public void testLazySubTransactionCreationForReadWriteTransactions() {
350         DOMStore configDomStore = mock(DOMStore.class);
351         DOMStore operationalDomStore = mock(DOMStore.class);
352         DOMStoreReadWriteTransaction storeTxn = mock(DOMStoreReadWriteTransaction.class);
353
354         doReturn(storeTxn).when(operationalDomStore).newReadWriteTransaction();
355         doReturn(storeTxn).when(configDomStore).newReadWriteTransaction();
356
357         try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
358                 LogicalDatastoreType.OPERATIONAL, operationalDomStore, LogicalDatastoreType.CONFIGURATION,
359                 configDomStore), futureExecutor)) {
360             DOMDataTreeReadWriteTransaction dataTxn = dataBroker.newReadWriteTransaction();
361
362             dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.EMPTY, mock(NormalizedNode.class));
363             dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.EMPTY, mock(NormalizedNode.class));
364             dataTxn.read(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.EMPTY);
365
366             verify(configDomStore, never()).newReadWriteTransaction();
367             verify(operationalDomStore, times(1)).newReadWriteTransaction();
368
369             dataTxn.put(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY, mock(NormalizedNode.class));
370
371             verify(configDomStore, times(1)).newReadWriteTransaction();
372             verify(operationalDomStore, times(1)).newReadWriteTransaction();
373         }
374
375     }
376
377     @Test
378     public void testLazySubTransactionCreationForWriteOnlyTransactions() {
379         DOMStore configDomStore = mock(DOMStore.class);
380         DOMStore operationalDomStore = mock(DOMStore.class);
381         DOMStoreWriteTransaction storeTxn = mock(DOMStoreWriteTransaction.class);
382
383         doReturn(storeTxn).when(operationalDomStore).newWriteOnlyTransaction();
384         doReturn(storeTxn).when(configDomStore).newWriteOnlyTransaction();
385
386         try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
387                 LogicalDatastoreType.OPERATIONAL, operationalDomStore, LogicalDatastoreType.CONFIGURATION,
388                 configDomStore), futureExecutor)) {
389             DOMDataTreeWriteTransaction dataTxn = dataBroker.newWriteOnlyTransaction();
390
391             dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.EMPTY, mock(NormalizedNode.class));
392             dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.EMPTY, mock(NormalizedNode.class));
393
394             verify(configDomStore, never()).newWriteOnlyTransaction();
395             verify(operationalDomStore, times(1)).newWriteOnlyTransaction();
396
397             dataTxn.put(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY, mock(NormalizedNode.class));
398
399             verify(configDomStore, times(1)).newWriteOnlyTransaction();
400             verify(operationalDomStore, times(1)).newWriteOnlyTransaction();
401         }
402     }
403
404     @Test
405     public void testLazySubTransactionCreationForReadOnlyTransactions() {
406         DOMStore configDomStore = mock(DOMStore.class);
407         DOMStore operationalDomStore = mock(DOMStore.class);
408         DOMStoreReadTransaction storeTxn = mock(DOMStoreReadTransaction.class);
409
410         doReturn(storeTxn).when(operationalDomStore).newReadOnlyTransaction();
411         doReturn(storeTxn).when(configDomStore).newReadOnlyTransaction();
412
413         try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
414                 LogicalDatastoreType.OPERATIONAL, operationalDomStore, LogicalDatastoreType.CONFIGURATION,
415                 configDomStore), futureExecutor)) {
416             DOMDataTreeReadTransaction dataTxn = dataBroker.newReadOnlyTransaction();
417
418             dataTxn.read(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.EMPTY);
419             dataTxn.read(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.EMPTY);
420
421             verify(configDomStore, never()).newReadOnlyTransaction();
422             verify(operationalDomStore, times(1)).newReadOnlyTransaction();
423
424             dataTxn.read(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY);
425
426             verify(configDomStore, times(1)).newReadOnlyTransaction();
427             verify(operationalDomStore, times(1)).newReadOnlyTransaction();
428         }
429     }
430
431     @Test
432     public void testSubmitWithOnlyOneSubTransaction() throws InterruptedException {
433         DOMStore configDomStore = mock(DOMStore.class);
434         DOMStore operationalDomStore = mock(DOMStore.class);
435         DOMStoreReadWriteTransaction mockStoreReadWriteTransaction = mock(DOMStoreReadWriteTransaction.class);
436         DOMStoreThreePhaseCommitCohort mockCohort = mock(DOMStoreThreePhaseCommitCohort.class);
437
438         doReturn(mockStoreReadWriteTransaction).when(operationalDomStore).newReadWriteTransaction();
439         doReturn(mockCohort).when(mockStoreReadWriteTransaction).ready();
440         doReturn(Futures.immediateFuture(false)).when(mockCohort).canCommit();
441         doReturn(Futures.immediateFuture(null)).when(mockCohort).abort();
442
443         final CountDownLatch latch = new CountDownLatch(1);
444         final List<DOMStoreThreePhaseCommitCohort> commitCohorts = new ArrayList<>();
445
446         try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
447                 LogicalDatastoreType.OPERATIONAL, operationalDomStore, LogicalDatastoreType.CONFIGURATION,
448                 configDomStore), futureExecutor) {
449             @Override
450             public FluentFuture<? extends CommitInfo> commit(DOMDataTreeWriteTransaction writeTx,
451                     Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
452                 commitCohorts.addAll(cohorts);
453                 latch.countDown();
454                 return super.commit(writeTx, cohorts);
455             }
456         }) {
457             DOMDataTreeReadWriteTransaction domDataReadWriteTransaction = dataBroker.newReadWriteTransaction();
458
459             domDataReadWriteTransaction.delete(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.EMPTY);
460
461             domDataReadWriteTransaction.commit();
462
463             assertTrue(latch.await(10, TimeUnit.SECONDS));
464
465             assertTrue(commitCohorts.size() == 1);
466         }
467     }
468
469     @Test
470     public void testSubmitWithOnlyTwoSubTransactions() throws InterruptedException {
471         DOMStore configDomStore = mock(DOMStore.class);
472         DOMStore operationalDomStore = mock(DOMStore.class);
473         DOMStoreReadWriteTransaction operationalTransaction = mock(DOMStoreReadWriteTransaction.class);
474         DOMStoreReadWriteTransaction configTransaction = mock(DOMStoreReadWriteTransaction.class);
475         DOMStoreThreePhaseCommitCohort mockCohortOperational = mock(DOMStoreThreePhaseCommitCohort.class);
476         DOMStoreThreePhaseCommitCohort mockCohortConfig = mock(DOMStoreThreePhaseCommitCohort.class);
477
478         doReturn(operationalTransaction).when(operationalDomStore).newReadWriteTransaction();
479         doReturn(configTransaction).when(configDomStore).newReadWriteTransaction();
480
481         doReturn(mockCohortOperational).when(operationalTransaction).ready();
482         doReturn(Futures.immediateFuture(false)).when(mockCohortOperational).canCommit();
483         doReturn(Futures.immediateFuture(null)).when(mockCohortOperational).abort();
484
485         doReturn(mockCohortConfig).when(configTransaction).ready();
486         doReturn(Futures.immediateFuture(false)).when(mockCohortConfig).canCommit();
487         doReturn(Futures.immediateFuture(null)).when(mockCohortConfig).abort();
488
489         final CountDownLatch latch = new CountDownLatch(1);
490         final List<DOMStoreThreePhaseCommitCohort> commitCohorts = new ArrayList<>();
491
492         try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
493                 LogicalDatastoreType.OPERATIONAL, operationalDomStore, LogicalDatastoreType.CONFIGURATION,
494                 configDomStore), futureExecutor) {
495             @Override
496             @SuppressWarnings("checkstyle:hiddenField")
497             public FluentFuture<? extends CommitInfo> commit(DOMDataTreeWriteTransaction writeTx,
498                     Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
499                 commitCohorts.addAll(cohorts);
500                 latch.countDown();
501                 return super.commit(writeTx, cohorts);
502             }
503         }) {
504             DOMDataTreeReadWriteTransaction domDataReadWriteTransaction = dataBroker.newReadWriteTransaction();
505
506             domDataReadWriteTransaction.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.EMPTY,
507                     mock(NormalizedNode.class));
508             domDataReadWriteTransaction.merge(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.EMPTY,
509                     mock(NormalizedNode.class));
510
511             domDataReadWriteTransaction.commit();
512
513             assertTrue(latch.await(10, TimeUnit.SECONDS));
514
515             assertTrue(commitCohorts.size() == 2);
516         }
517     }
518
519     @Test
520     public void testCreateTransactionChain() {
521         DOMStore domStore = mock(DOMStore.class);
522         try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
523                 LogicalDatastoreType.OPERATIONAL, domStore, LogicalDatastoreType.CONFIGURATION, domStore),
524                 futureExecutor)) {
525
526             dataBroker.createTransactionChain(mock(TransactionChainListener.class));
527
528             verify(domStore, times(2)).createTransactionChain();
529         }
530
531     }
532
533     @Test
534     public void testCreateTransactionOnChain() {
535         DOMStore domStore = mock(DOMStore.class);
536         try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
537                 LogicalDatastoreType.OPERATIONAL, domStore, LogicalDatastoreType.CONFIGURATION, domStore),
538                 futureExecutor)) {
539
540             DOMStoreReadWriteTransaction operationalTransaction = mock(DOMStoreReadWriteTransaction.class);
541             DOMStoreTransactionChain mockChain = mock(DOMStoreTransactionChain.class);
542
543             doReturn(mockChain).when(domStore).createTransactionChain();
544             doReturn(operationalTransaction).when(mockChain).newWriteOnlyTransaction();
545
546             DOMTransactionChain transactionChain = dataBroker.createTransactionChain(
547                     mock(TransactionChainListener.class));
548
549             DOMDataTreeWriteTransaction domDataWriteTransaction = transactionChain.newWriteOnlyTransaction();
550
551             verify(mockChain, never()).newWriteOnlyTransaction();
552
553             domDataWriteTransaction.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.EMPTY,
554                     mock(NormalizedNode.class));
555         }
556     }
557
558     @Test
559     public void testEmptyTransactionSubmitSucceeds() throws ExecutionException, InterruptedException {
560         DOMStore domStore = mock(DOMStore.class);
561         try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
562                 LogicalDatastoreType.OPERATIONAL, domStore, LogicalDatastoreType.CONFIGURATION, domStore),
563                 futureExecutor)) {
564
565             FluentFuture<? extends CommitInfo> submit1 = dataBroker.newWriteOnlyTransaction().commit();
566
567             assertNotNull(submit1);
568
569             submit1.get();
570
571             FluentFuture<? extends CommitInfo> submit2 = dataBroker.newReadWriteTransaction().commit();
572
573             assertNotNull(submit2);
574
575             submit2.get();
576         }
577     }
578
579     @Test
580     public void testExtensions() {
581         DistributedDataStore mockConfigStore = mock(DistributedDataStore.class);
582         DistributedDataStore mockOperStore = mock(DistributedDataStore.class);
583         try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
584                 LogicalDatastoreType.OPERATIONAL, mockOperStore,
585                 LogicalDatastoreType.CONFIGURATION, mockConfigStore), futureExecutor)) {
586
587             Map<Class<? extends DOMDataBrokerExtension>, DOMDataBrokerExtension> supportedExtensions =
588                     dataBroker.getSupportedExtensions();
589             assertNotNull(supportedExtensions.get(DOMDataTreeChangeService.class));
590
591             DOMDataTreeCommitCohortRegistry cohortRegistry =
592                     (DOMDataTreeCommitCohortRegistry) supportedExtensions.get(DOMDataTreeCommitCohortRegistry.class);
593             assertNotNull(cohortRegistry);
594
595             DOMDataTreeCommitCohort mockCohort = mock(DOMDataTreeCommitCohort.class);
596             DOMDataTreeIdentifier path = new DOMDataTreeIdentifier(
597                     org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION,
598                     YangInstanceIdentifier.EMPTY);
599             cohortRegistry.registerCommitCohort(path, mockCohort);
600
601             verify(mockConfigStore).registerCommitCohort(path, mockCohort);
602         }
603     }
604 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.