Migrate some tests to new ImmutableNodes
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / databroker / ConcurrentDOMDataBrokerTest.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertSame;
13 import static org.junit.Assert.assertTrue;
14 import static org.junit.Assert.fail;
15 import static org.mockito.Mockito.doAnswer;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.inOrder;
18 import static org.mockito.Mockito.mock;
19 import static org.mockito.Mockito.never;
20 import static org.mockito.Mockito.times;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.yangtools.util.concurrent.FluentFutures.immediateFalseFluentFuture;
23 import static org.opendaylight.yangtools.util.concurrent.FluentFutures.immediateNullFluentFuture;
24 import static org.opendaylight.yangtools.util.concurrent.FluentFutures.immediateTrueFluentFuture;
25
26 import com.google.common.base.Throwables;
27 import com.google.common.collect.ImmutableMap;
28 import com.google.common.util.concurrent.FluentFuture;
29 import com.google.common.util.concurrent.FutureCallback;
30 import com.google.common.util.concurrent.Futures;
31 import com.google.common.util.concurrent.ListenableFuture;
32 import com.google.common.util.concurrent.MoreExecutors;
33 import com.google.common.util.concurrent.SettableFuture;
34 import com.google.common.util.concurrent.Uninterruptibles;
35 import java.util.ArrayList;
36 import java.util.List;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.SynchronousQueue;
40 import java.util.concurrent.ThreadPoolExecutor;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.TimeoutException;
43 import java.util.concurrent.atomic.AtomicReference;
44 import org.junit.After;
45 import org.junit.Before;
46 import org.junit.Test;
47 import org.mockito.InOrder;
48 import org.mockito.stubbing.Answer;
49 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
50 import org.opendaylight.mdsal.common.api.CommitInfo;
51 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
52 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
53 import org.opendaylight.mdsal.dom.api.DOMDataBroker.CommitCohortExtension;
54 import org.opendaylight.mdsal.dom.api.DOMDataBroker.DataTreeChangeExtension;
55 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
56 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
57 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
58 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
59 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
60 import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
61 import org.opendaylight.mdsal.dom.spi.TransactionCommitFailedExceptionMapper;
62 import org.opendaylight.mdsal.dom.spi.store.DOMStore;
63 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
64 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
65 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
66 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
67 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
68 import org.opendaylight.mdsal.dom.store.inmemory.InMemoryDOMDataStore;
69 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
70 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
71
72 /**
73  * Unit tests for DOMConcurrentDataCommitCoordinator.
74  *
75  * @author Thomas Pantelis
76  */
77 public class ConcurrentDOMDataBrokerTest {
78
79     private final DOMDataTreeWriteTransaction transaction = mock(DOMDataTreeWriteTransaction.class);
80     private final DOMStoreThreePhaseCommitCohort mockCohort = mock(DOMStoreThreePhaseCommitCohort.class);
81     private final ThreadPoolExecutor futureExecutor =
82             new ThreadPoolExecutor(0, 1, 5, TimeUnit.SECONDS, new SynchronousQueue<>());
83     private ConcurrentDOMDataBroker coordinator;
84
85     @Before
86     public void setup() {
87         doReturn("tx").when(transaction).getIdentifier();
88
89         DOMStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.newDirectExecutorService());
90
91         coordinator = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL, store),
92                 futureExecutor);
93     }
94
95     @After
96     public void tearDown() {
97         futureExecutor.shutdownNow();
98     }
99
100     @Test
101     public void testSuccessfulSubmitAsync() throws Exception {
102         testSuccessfulSubmit(true);
103     }
104
105     @Test
106     public void testSuccessfulSubmitSync() throws Exception {
107         testSuccessfulSubmit(false);
108     }
109
110     private void testSuccessfulSubmit(final boolean doAsync) throws InterruptedException {
111         final CountDownLatch asyncCanCommitContinue = new CountDownLatch(1);
112         Answer<ListenableFuture<Boolean>> asyncCanCommit = invocation -> {
113             final SettableFuture<Boolean> future = SettableFuture.create();
114             if (doAsync) {
115                 new Thread(() -> {
116                     Uninterruptibles.awaitUninterruptibly(asyncCanCommitContinue, 10, TimeUnit.SECONDS);
117                     future.set(Boolean.TRUE);
118                 }).start();
119             } else {
120                 future.set(Boolean.TRUE);
121             }
122
123             return future;
124         };
125
126         doAnswer(asyncCanCommit).when(mockCohort).canCommit();
127         doReturn(immediateNullFluentFuture()).when(mockCohort).preCommit();
128         doReturn(immediateNullFluentFuture()).when(mockCohort).commit();
129
130         ListenableFuture<? extends CommitInfo> future = coordinator.commit(transaction, mockCohort);
131
132         final CountDownLatch doneLatch = new CountDownLatch(1);
133         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
134         Futures.addCallback(future, new FutureCallback<CommitInfo>() {
135             @Override
136             public void onSuccess(final CommitInfo result) {
137                 doneLatch.countDown();
138             }
139
140             @Override
141             public void onFailure(final Throwable failure) {
142                 caughtEx.set(failure);
143                 doneLatch.countDown();
144             }
145         }, MoreExecutors.directExecutor());
146
147         asyncCanCommitContinue.countDown();
148
149         assertTrue("Submit complete", doneLatch.await(5, TimeUnit.SECONDS));
150
151         if (caughtEx.get() != null) {
152             Throwables.throwIfUnchecked(caughtEx.get());
153             throw new RuntimeException(caughtEx.get());
154         }
155
156         assertEquals("Task count", doAsync ? 1 : 0, futureExecutor.getTaskCount());
157
158         InOrder inOrder = inOrder(mockCohort);
159         inOrder.verify(mockCohort, times(1)).canCommit();
160         inOrder.verify(mockCohort, times(1)).preCommit();
161         inOrder.verify(mockCohort, times(1)).commit();
162     }
163
164     @Test
165     public void testSubmitWithNegativeCanCommitResponse() throws Exception {
166         doReturn(Futures.immediateFuture(Boolean.FALSE)).when(mockCohort).canCommit();
167         doReturn(immediateNullFluentFuture()).when(mockCohort).abort();
168
169         assertFailure(coordinator.commit(transaction, mockCohort), null, mockCohort);
170     }
171
172     private static void assertFailure(final ListenableFuture<?> future, final Exception expCause,
173             final DOMStoreThreePhaseCommitCohort mockCohort) throws Exception {
174         try {
175             future.get(5, TimeUnit.SECONDS);
176             fail("Expected TransactionCommitFailedException");
177         } catch (ExecutionException e) {
178             TransactionCommitFailedException tcf = TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e);
179             if (expCause != null) {
180                 assertSame("Expected cause", expCause.getClass(), tcf.getCause().getClass());
181             }
182             verify(mockCohort, times(1)).abort();
183         } catch (TimeoutException e) {
184             throw e;
185         }
186     }
187
188     @Test
189     public void testSubmitWithCanCommitException() throws Exception {
190         final Exception cause = new IllegalStateException("mock");
191         doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort).canCommit();
192         doReturn(immediateNullFluentFuture()).when(mockCohort).abort();
193
194         assertFailure(coordinator.commit(transaction, mockCohort), cause, mockCohort);
195     }
196
197     @Test
198     public void testSubmitWithPreCommitException() throws Exception {
199         doReturn(immediateTrueFluentFuture()).when(mockCohort).canCommit();
200         final IllegalStateException cause = new IllegalStateException("mock");
201         doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort).preCommit();
202         doReturn(immediateNullFluentFuture()).when(mockCohort).abort();
203
204         assertFailure(coordinator.commit(transaction, mockCohort), cause, mockCohort);
205     }
206
207     @Test
208     public void testSubmitWithCommitException() throws Exception {
209         doReturn(immediateTrueFluentFuture()).when(mockCohort).canCommit();
210         doReturn(immediateNullFluentFuture()).when(mockCohort).preCommit();
211         final IllegalStateException cause = new IllegalStateException("mock");
212         doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort).commit();
213         doReturn(immediateNullFluentFuture()).when(mockCohort).abort();
214
215         assertFailure(coordinator.commit(transaction, mockCohort), cause, mockCohort);
216     }
217
218     @Test
219     public void testSubmitWithAbortException() throws Exception {
220         final Exception canCommitCause = new IllegalStateException("canCommit error");
221         doReturn(Futures.immediateFailedFuture(canCommitCause)).when(mockCohort).canCommit();
222         final Exception abortCause = new IllegalStateException("abort error");
223         doReturn(Futures.immediateFailedFuture(abortCause)).when(mockCohort).abort();
224
225         assertFailure(coordinator.commit(transaction, mockCohort), canCommitCause, mockCohort);
226     }
227
228     @Test
229     public void testCreateReadWriteTransaction() {
230         DOMStore domStore = mock(DOMStore.class);
231         try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
232                 LogicalDatastoreType.OPERATIONAL, domStore, LogicalDatastoreType.CONFIGURATION, domStore),
233                 futureExecutor)) {
234             dataBroker.newReadWriteTransaction();
235
236             verify(domStore, never()).newReadWriteTransaction();
237         }
238     }
239
240     @Test
241     public void testCreateWriteOnlyTransaction() {
242         DOMStore domStore = mock(DOMStore.class);
243         try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
244                 LogicalDatastoreType.OPERATIONAL, domStore, LogicalDatastoreType.CONFIGURATION, domStore),
245                 futureExecutor)) {
246             dataBroker.newWriteOnlyTransaction();
247
248             verify(domStore, never()).newWriteOnlyTransaction();
249         }
250     }
251
252     @Test
253     public void testCreateReadOnlyTransaction() {
254         DOMStore domStore = mock(DOMStore.class);
255         try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
256                 LogicalDatastoreType.OPERATIONAL, domStore, LogicalDatastoreType.CONFIGURATION, domStore),
257                 futureExecutor)) {
258             dataBroker.newReadOnlyTransaction();
259
260             verify(domStore, never()).newReadOnlyTransaction();
261         }
262     }
263
264     @Test
265     public void testLazySubTransactionCreationForReadWriteTransactions() {
266         DOMStore configDomStore = mock(DOMStore.class);
267         DOMStore operationalDomStore = mock(DOMStore.class);
268         DOMStoreReadWriteTransaction storeTxn = mock(DOMStoreReadWriteTransaction.class);
269
270         doReturn(storeTxn).when(operationalDomStore).newReadWriteTransaction();
271         doReturn(storeTxn).when(configDomStore).newReadWriteTransaction();
272
273         try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
274                 LogicalDatastoreType.OPERATIONAL, operationalDomStore, LogicalDatastoreType.CONFIGURATION,
275                 configDomStore), futureExecutor)) {
276             DOMDataTreeReadWriteTransaction dataTxn = dataBroker.newReadWriteTransaction();
277
278             dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.of(), mock(ContainerNode.class));
279             dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.of(), mock(ContainerNode.class));
280             dataTxn.read(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.of());
281
282             verify(configDomStore, never()).newReadWriteTransaction();
283             verify(operationalDomStore, times(1)).newReadWriteTransaction();
284         }
285
286     }
287
288     @Test
289     public void testLazySubTransactionCreationForWriteOnlyTransactions() {
290         DOMStore configDomStore = mock(DOMStore.class);
291         DOMStore operationalDomStore = mock(DOMStore.class);
292         DOMStoreWriteTransaction storeTxn = mock(DOMStoreWriteTransaction.class);
293
294         doReturn(storeTxn).when(operationalDomStore).newWriteOnlyTransaction();
295         doReturn(storeTxn).when(configDomStore).newWriteOnlyTransaction();
296
297         try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
298                 LogicalDatastoreType.OPERATIONAL, operationalDomStore, LogicalDatastoreType.CONFIGURATION,
299                 configDomStore), futureExecutor)) {
300             DOMDataTreeWriteTransaction dataTxn = dataBroker.newWriteOnlyTransaction();
301
302             dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.of(), mock(ContainerNode.class));
303             dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.of(), mock(ContainerNode.class));
304
305             verify(configDomStore, never()).newWriteOnlyTransaction();
306             verify(operationalDomStore, times(1)).newWriteOnlyTransaction();
307         }
308     }
309
310     @Test
311     public void testLazySubTransactionCreationForReadOnlyTransactions() {
312         DOMStore configDomStore = mock(DOMStore.class);
313         DOMStore operationalDomStore = mock(DOMStore.class);
314         DOMStoreReadTransaction storeTxn = mock(DOMStoreReadTransaction.class);
315
316         doReturn(storeTxn).when(operationalDomStore).newReadOnlyTransaction();
317         doReturn(storeTxn).when(configDomStore).newReadOnlyTransaction();
318
319         try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
320                 LogicalDatastoreType.OPERATIONAL, operationalDomStore, LogicalDatastoreType.CONFIGURATION,
321                 configDomStore), futureExecutor)) {
322             DOMDataTreeReadTransaction dataTxn = dataBroker.newReadOnlyTransaction();
323
324             dataTxn.read(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.of());
325             dataTxn.read(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.of());
326
327             verify(configDomStore, never()).newReadOnlyTransaction();
328             verify(operationalDomStore, times(1)).newReadOnlyTransaction();
329         }
330     }
331
332     @Test
333     public void testSubmitWithOnlyOneSubTransaction() throws InterruptedException {
334         DOMStore configDomStore = mock(DOMStore.class);
335         DOMStore operationalDomStore = mock(DOMStore.class);
336         DOMStoreReadWriteTransaction mockStoreReadWriteTransaction = mock(DOMStoreReadWriteTransaction.class);
337
338         doReturn(mockStoreReadWriteTransaction).when(operationalDomStore).newReadWriteTransaction();
339         doReturn(mockCohort).when(mockStoreReadWriteTransaction).ready();
340         doReturn(immediateFalseFluentFuture()).when(mockCohort).canCommit();
341         doReturn(immediateNullFluentFuture()).when(mockCohort).abort();
342
343         final CountDownLatch latch = new CountDownLatch(1);
344         final List<DOMStoreThreePhaseCommitCohort> commitCohorts = new ArrayList<>();
345
346         try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
347                 LogicalDatastoreType.OPERATIONAL, operationalDomStore, LogicalDatastoreType.CONFIGURATION,
348                 configDomStore), futureExecutor) {
349             @Override
350             public FluentFuture<? extends CommitInfo> commit(DOMDataTreeWriteTransaction writeTx,
351                     DOMStoreThreePhaseCommitCohort cohort) {
352                 commitCohorts.add(cohort);
353                 latch.countDown();
354                 return super.commit(writeTx, cohort);
355             }
356         }) {
357             DOMDataTreeReadWriteTransaction domDataReadWriteTransaction = dataBroker.newReadWriteTransaction();
358
359             domDataReadWriteTransaction.delete(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.of());
360
361             domDataReadWriteTransaction.commit();
362
363             assertTrue(latch.await(10, TimeUnit.SECONDS));
364
365             assertTrue(commitCohorts.size() == 1);
366         }
367     }
368
369     @Test
370     public void testCreateTransactionChain() {
371         DOMStore domStore = mock(DOMStore.class);
372         try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
373                 LogicalDatastoreType.OPERATIONAL, domStore, LogicalDatastoreType.CONFIGURATION, domStore),
374                 futureExecutor)) {
375
376             dataBroker.createTransactionChain();
377
378             verify(domStore, times(2)).createTransactionChain();
379         }
380
381     }
382
383     @Test
384     public void testCreateTransactionOnChain() {
385         DOMStore domStore = mock(DOMStore.class);
386         try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
387                 LogicalDatastoreType.OPERATIONAL, domStore, LogicalDatastoreType.CONFIGURATION, domStore),
388                 futureExecutor)) {
389
390             DOMStoreReadWriteTransaction operationalTransaction = mock(DOMStoreReadWriteTransaction.class);
391             DOMStoreTransactionChain mockChain = mock(DOMStoreTransactionChain.class);
392
393             doReturn(mockChain).when(domStore).createTransactionChain();
394             doReturn(operationalTransaction).when(mockChain).newWriteOnlyTransaction();
395
396             DOMTransactionChain transactionChain = dataBroker.createTransactionChain();
397
398             DOMDataTreeWriteTransaction domDataWriteTransaction = transactionChain.newWriteOnlyTransaction();
399
400             verify(mockChain, never()).newWriteOnlyTransaction();
401
402             domDataWriteTransaction.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.of(),
403                     mock(ContainerNode.class));
404         }
405     }
406
407     @Test
408     public void testEmptyTransactionSubmitSucceeds() throws ExecutionException, InterruptedException {
409         DOMStore domStore = mock(DOMStore.class);
410         try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
411                 LogicalDatastoreType.OPERATIONAL, domStore, LogicalDatastoreType.CONFIGURATION, domStore),
412                 futureExecutor)) {
413
414             FluentFuture<? extends CommitInfo> submit1 = dataBroker.newWriteOnlyTransaction().commit();
415
416             assertNotNull(submit1);
417
418             submit1.get();
419
420             FluentFuture<? extends CommitInfo> submit2 = dataBroker.newReadWriteTransaction().commit();
421
422             assertNotNull(submit2);
423
424             submit2.get();
425         }
426     }
427
428     @Test
429     public void testExtensions() {
430         final var mockConfigStore = mock(AbstractDataStore.class);
431         final var mockOperStore = mock(AbstractDataStore.class);
432         try (var dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
433                 LogicalDatastoreType.OPERATIONAL, mockOperStore,
434                 LogicalDatastoreType.CONFIGURATION, mockConfigStore), futureExecutor)) {
435             assertNotNull(dataBroker.extension(DataTreeChangeExtension.class));
436
437             final var cohortRegistry = dataBroker.extension(CommitCohortExtension.class);
438             assertNotNull(cohortRegistry);
439
440             final var cohort = mock(DOMDataTreeCommitCohort.class);
441             final var path = DOMDataTreeIdentifier.of(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.of());
442             cohortRegistry.registerCommitCohort(path, cohort);
443
444             verify(mockConfigStore).registerCommitCohort(path, cohort);
445         }
446     }
447 }