Default AsyncWriteTransaction.submit()
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / test / java / org / opendaylight / controller / md / sal / dom / broker / impl / DOMBrokerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.md.sal.dom.broker.impl;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
16 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
17
18 import com.google.common.base.Optional;
19 import com.google.common.collect.ImmutableMap;
20 import com.google.common.util.concurrent.ForwardingExecutorService;
21 import com.google.common.util.concurrent.FutureCallback;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.ListenableFuture;
24 import com.google.common.util.concurrent.ListeningExecutorService;
25 import com.google.common.util.concurrent.MoreExecutors;
26 import java.util.Collections;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.RejectedExecutionException;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.TimeoutException;
34 import java.util.concurrent.atomic.AtomicReference;
35 import org.junit.After;
36 import org.junit.Before;
37 import org.junit.Test;
38 import org.mockito.Mockito;
39 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
40 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
41 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
42 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException;
43 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
44 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
45 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
46 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
47 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
48 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
49 import org.opendaylight.controller.md.sal.dom.store.impl.TestModel;
50 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
51 import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
52 import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
53 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
54 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
55 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
56 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
57
58 public class DOMBrokerTest {
59
60     private SchemaContext schemaContext;
61     private AbstractDOMDataBroker domBroker;
62     private ListeningExecutorService executor;
63     private ExecutorService futureExecutor;
64     private CommitExecutorService commitExecutor;
65
66     @Before
67     public void setupStore() {
68
69         InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.newDirectExecutorService());
70         InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.newDirectExecutorService());
71         schemaContext = TestModel.createTestContext();
72
73         operStore.onGlobalContextUpdated(schemaContext);
74         configStore.onGlobalContextUpdated(schemaContext);
75
76         final ImmutableMap<LogicalDatastoreType, DOMStore> stores =
77                 ImmutableMap.<LogicalDatastoreType, DOMStore>builder() //
78                 .put(CONFIGURATION, configStore) //
79                 .put(OPERATIONAL, operStore) //
80                 .build();
81
82         commitExecutor = new CommitExecutorService(Executors.newSingleThreadExecutor());
83         futureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(1, 5, "FCB", DOMBrokerTest.class);
84         executor = new DeadlockDetectingListeningExecutorService(commitExecutor,
85                                                                  TransactionCommitDeadlockException
86                                                                          .DEADLOCK_EXCEPTION_SUPPLIER,
87                                                                  futureExecutor);
88         domBroker = new SerializedDOMDataBroker(stores, executor);
89     }
90
91     @After
92     public void tearDown() {
93         if (executor != null) {
94             executor.shutdownNow();
95         }
96
97         if (futureExecutor != null) {
98             futureExecutor.shutdownNow();
99         }
100     }
101
102     @Test(timeout = 10000)
103     public void testTransactionIsolation() throws InterruptedException, ExecutionException {
104
105         assertNotNull(domBroker);
106
107         DOMDataReadTransaction readTx = domBroker.newReadOnlyTransaction();
108         assertNotNull(readTx);
109
110         DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
111         assertNotNull(writeTx);
112         /**
113          *
114          * Writes /test in writeTx
115          *
116          */
117         writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
118
119         /**
120          * Reads /test from writeTx Read should return container.
121          *
122          */
123         ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx
124                 .read(OPERATIONAL, TestModel.TEST_PATH);
125         assertTrue(writeTxContainer.get().isPresent());
126
127         /**
128          * Reads /test from readTx Read should return Absent.
129          *
130          */
131         ListenableFuture<Optional<NormalizedNode<?, ?>>> readTxContainer = readTx
132                 .read(OPERATIONAL, TestModel.TEST_PATH);
133         assertFalse(readTxContainer.get().isPresent());
134     }
135
136     @Test(timeout = 10000)
137     public void testTransactionCommit() throws InterruptedException, ExecutionException, TimeoutException {
138
139         DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
140         assertNotNull(writeTx);
141         /**
142          *
143          * Writes /test in writeTx
144          *
145          */
146         writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
147
148         /**
149          * Reads /test from writeTx Read should return container.
150          *
151          */
152         ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx
153                 .read(OPERATIONAL, TestModel.TEST_PATH);
154         assertTrue(writeTxContainer.get().isPresent());
155
156         writeTx.commit().get(5, TimeUnit.SECONDS);
157
158         Optional<NormalizedNode<?, ?>> afterCommitRead = domBroker.newReadOnlyTransaction()
159                 .read(OPERATIONAL, TestModel.TEST_PATH).get();
160         assertTrue(afterCommitRead.isPresent());
161     }
162
163     @Test(timeout = 10000)
164     @Deprecated
165     public void testTransactionSubmit() throws InterruptedException, ExecutionException, TimeoutException {
166
167         DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
168         assertNotNull(writeTx);
169         /**
170          *
171          * Writes /test in writeTx
172          *
173          */
174         writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
175
176         /**
177          * Reads /test from writeTx Read should return container.
178          *
179          */
180         ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx
181                 .read(OPERATIONAL, TestModel.TEST_PATH);
182         assertTrue(writeTxContainer.get().isPresent());
183
184         writeTx.submit().get(5, TimeUnit.SECONDS);
185
186         Optional<NormalizedNode<?, ?>> afterCommitRead = domBroker.newReadOnlyTransaction()
187                 .read(OPERATIONAL, TestModel.TEST_PATH).get();
188         assertTrue(afterCommitRead.isPresent());
189     }
190
191     @Test(expected = TransactionCommitFailedException.class)
192     @SuppressWarnings({"checkstyle:IllegalThrows", "checkstyle:AvoidHidingCauseException"})
193     public void testRejectedCommit() throws Throwable {
194
195         commitExecutor.delegate = Mockito.mock(ExecutorService.class);
196         Mockito.doThrow(new RejectedExecutionException("mock")).when(commitExecutor.delegate)
197                 .execute(Mockito.any(Runnable.class));
198         Mockito.doNothing().when(commitExecutor.delegate).shutdown();
199         Mockito.doReturn(Collections.emptyList()).when(commitExecutor.delegate).shutdownNow();
200         Mockito.doReturn("").when(commitExecutor.delegate).toString();
201         Mockito.doReturn(true).when(commitExecutor.delegate)
202                 .awaitTermination(Mockito.anyLong(), Mockito.any(TimeUnit.class));
203
204         DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
205         writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
206
207         try {
208             writeTx.commit().get(5, TimeUnit.SECONDS);
209         } catch (ExecutionException e) {
210             throw e.getCause();
211         }
212     }
213
214     /**
215      * Tests a simple DataChangeListener notification after a write.
216      */
217     @Test
218     @SuppressWarnings("checkstyle:IllegalThrows")
219     public void testDataChangeListener() throws Throwable {
220
221         final NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
222
223         TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener();
224
225         domBroker.registerDataChangeListener(OPERATIONAL, TestModel.TEST_PATH, dcListener, DataChangeScope.BASE);
226
227         final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
228         assertNotNull(writeTx);
229
230         writeTx.put(OPERATIONAL, TestModel.TEST_PATH, testNode);
231
232         AtomicReference<Throwable> caughtEx = submitTxAsync(writeTx);
233
234         dcListener.waitForChange();
235
236         if (caughtEx.get() != null) {
237             throw caughtEx.get();
238         }
239
240         NormalizedNode<?, ?> actualNode = dcListener.capturedChange.getCreatedData().get(TestModel.TEST_PATH);
241         assertEquals("Created node", testNode, actualNode);
242     }
243
244     /**
245      * Tests a DataChangeListener that does an async submit of a write Tx in its onDataChanged method.
246      * This should succeed without deadlock.
247      */
248     @Test
249     @SuppressWarnings("checkstyle:IllegalThrows")
250     public void testDataChangeListenerDoingAsyncWriteTxSubmit() throws Throwable {
251
252         final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
253         final CountDownLatch commitCompletedLatch = new CountDownLatch(1);
254
255         TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
256             @Override
257             public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
258
259                 DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
260                 writeTx.put(OPERATIONAL, TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME));
261                 Futures.addCallback(writeTx.submit(), new FutureCallback<Void>() {
262                     @Override
263                     public void onSuccess(final Void result) {
264                         commitCompletedLatch.countDown();
265                     }
266
267                     @Override
268                     public void onFailure(final Throwable throwable) {
269                         caughtCommitEx.set(throwable);
270                         commitCompletedLatch.countDown();
271                     }
272                 }, MoreExecutors.directExecutor());
273
274                 super.onDataChanged(change);
275             }
276         };
277
278         domBroker.registerDataChangeListener(OPERATIONAL, TestModel.TEST_PATH, dcListener, DataChangeScope.BASE);
279
280         final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
281         assertNotNull(writeTx);
282
283         writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
284
285         AtomicReference<Throwable> caughtEx = submitTxAsync(writeTx);
286
287         dcListener.waitForChange();
288
289         if (caughtEx.get() != null) {
290             throw caughtEx.get();
291         }
292
293         assertTrue("Commit Future was not invoked", commitCompletedLatch.await(5, TimeUnit.SECONDS));
294
295         if (caughtCommitEx.get() != null) {
296             throw caughtCommitEx.get();
297         }
298     }
299
300     /**
301      * Tests a DataChangeListener that does a blocking submit of a write Tx in its onDataChanged method.
302      * This should throw an exception and not deadlock.
303      */
304     @Test(expected = TransactionCommitDeadlockException.class)
305     @SuppressWarnings({"checkstyle:IllegalThrows", "checkstyle:IllegalCatch"})
306     public void testDataChangeListenerDoingBlockingWriteTxSubmit() throws Throwable {
307
308         final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
309
310         TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
311             @Override
312             public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
313                 DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
314                 writeTx.put(OPERATIONAL, TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME));
315                 try {
316                     writeTx.commit().get();
317                 } catch (ExecutionException e) {
318                     caughtCommitEx.set(e.getCause());
319                 } catch (Exception e) {
320                     caughtCommitEx.set(e);
321                 } finally {
322                     super.onDataChanged(change);
323                 }
324             }
325         };
326
327         domBroker.registerDataChangeListener(OPERATIONAL, TestModel.TEST_PATH, dcListener, DataChangeScope.BASE);
328
329         final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
330         assertNotNull(writeTx);
331
332         writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
333
334         AtomicReference<Throwable> caughtEx = submitTxAsync(writeTx);
335
336         dcListener.waitForChange();
337
338         if (caughtEx.get() != null) {
339             throw caughtEx.get();
340         }
341
342         if (caughtCommitEx.get() != null) {
343             throw caughtCommitEx.get();
344         }
345     }
346
347     @SuppressWarnings("checkstyle:IllegalCatch")
348     AtomicReference<Throwable> submitTxAsync(final DOMDataWriteTransaction writeTx) {
349         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
350         new Thread(() -> {
351             try {
352                 writeTx.commit();
353             } catch (Throwable e) {
354                 caughtEx.set(e);
355             }
356         }).start();
357
358         return caughtEx;
359     }
360
361     static class TestDOMDataChangeListener implements DOMDataChangeListener {
362
363         volatile AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> capturedChange;
364         private final CountDownLatch latch = new CountDownLatch(1);
365
366         @Override
367         public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
368             this.capturedChange = change;
369             latch.countDown();
370         }
371
372         void waitForChange() throws InterruptedException {
373             assertTrue("onDataChanged was not called", latch.await(5, TimeUnit.SECONDS));
374         }
375     }
376
377     static class CommitExecutorService extends ForwardingExecutorService {
378
379         ExecutorService delegate;
380
381         CommitExecutorService(final ExecutorService delegate) {
382             this.delegate = delegate;
383         }
384
385         @Override
386         protected ExecutorService delegate() {
387             return delegate;
388         }
389     }
390 }