07ca5450feee8a155cc3803847385984968a4dd6
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / test / java / org / opendaylight / controller / md / sal / dom / broker / impl / DOMBrokerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.md.sal.dom.broker.impl;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
16 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
17
18 import com.google.common.base.Optional;
19 import com.google.common.collect.ImmutableMap;
20 import com.google.common.util.concurrent.ForwardingExecutorService;
21 import com.google.common.util.concurrent.FutureCallback;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.ListenableFuture;
24 import com.google.common.util.concurrent.ListeningExecutorService;
25 import com.google.common.util.concurrent.MoreExecutors;
26 import java.util.Collections;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.RejectedExecutionException;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicReference;
34 import org.junit.After;
35 import org.junit.Before;
36 import org.junit.Test;
37 import org.mockito.Mockito;
38 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
39 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
40 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
41 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException;
42 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
43 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
44 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
45 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
46 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
47 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
48 import org.opendaylight.controller.md.sal.dom.store.impl.TestModel;
49 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
50 import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
51 import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
52 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
53 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
54 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
55 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
56
57 public class DOMBrokerTest {
58
59     private SchemaContext schemaContext;
60     private AbstractDOMDataBroker domBroker;
61     private ListeningExecutorService executor;
62     private ExecutorService futureExecutor;
63     private CommitExecutorService commitExecutor;
64
65     @Before
66     public void setupStore() {
67
68         InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.newDirectExecutorService());
69         InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.newDirectExecutorService());
70         schemaContext = TestModel.createTestContext();
71
72         operStore.onGlobalContextUpdated(schemaContext);
73         configStore.onGlobalContextUpdated(schemaContext);
74
75         final ImmutableMap<LogicalDatastoreType, DOMStore> stores =
76                 ImmutableMap.<LogicalDatastoreType, DOMStore>builder() //
77                 .put(CONFIGURATION, configStore) //
78                 .put(OPERATIONAL, operStore) //
79                 .build();
80
81         commitExecutor = new CommitExecutorService(Executors.newSingleThreadExecutor());
82         futureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(1, 5, "FCB", DOMBrokerTest.class);
83         executor = new DeadlockDetectingListeningExecutorService(commitExecutor,
84                                                                  TransactionCommitDeadlockException
85                                                                          .DEADLOCK_EXCEPTION_SUPPLIER,
86                                                                  futureExecutor);
87         domBroker = new SerializedDOMDataBroker(stores, executor);
88     }
89
90     @After
91     public void tearDown() {
92         if (executor != null) {
93             executor.shutdownNow();
94         }
95
96         if (futureExecutor != null) {
97             futureExecutor.shutdownNow();
98         }
99     }
100
101     @Test(timeout = 10000)
102     public void testTransactionIsolation() throws InterruptedException, ExecutionException {
103
104         assertNotNull(domBroker);
105
106         DOMDataReadTransaction readTx = domBroker.newReadOnlyTransaction();
107         assertNotNull(readTx);
108
109         DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
110         assertNotNull(writeTx);
111         /**
112          *
113          * Writes /test in writeTx
114          *
115          */
116         writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
117
118         /**
119          * Reads /test from writeTx Read should return container.
120          *
121          */
122         ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx
123                 .read(OPERATIONAL, TestModel.TEST_PATH);
124         assertTrue(writeTxContainer.get().isPresent());
125
126         /**
127          * Reads /test from readTx Read should return Absent.
128          *
129          */
130         ListenableFuture<Optional<NormalizedNode<?, ?>>> readTxContainer = readTx
131                 .read(OPERATIONAL, TestModel.TEST_PATH);
132         assertFalse(readTxContainer.get().isPresent());
133     }
134
135     @Test(timeout = 10000)
136     public void testTransactionCommit() throws InterruptedException, ExecutionException {
137
138         DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
139         assertNotNull(writeTx);
140         /**
141          *
142          * Writes /test in writeTx
143          *
144          */
145         writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
146
147         /**
148          * Reads /test from writeTx Read should return container.
149          *
150          */
151         ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx
152                 .read(OPERATIONAL, TestModel.TEST_PATH);
153         assertTrue(writeTxContainer.get().isPresent());
154
155         writeTx.submit().get();
156
157         Optional<NormalizedNode<?, ?>> afterCommitRead = domBroker.newReadOnlyTransaction()
158                 .read(OPERATIONAL, TestModel.TEST_PATH).get();
159         assertTrue(afterCommitRead.isPresent());
160     }
161
162     @Test(expected = TransactionCommitFailedException.class)
163     public void testRejectedCommit() throws Exception {
164
165         commitExecutor.delegate = Mockito.mock(ExecutorService.class);
166         Mockito.doThrow(new RejectedExecutionException("mock")).when(commitExecutor.delegate)
167                 .execute(Mockito.any(Runnable.class));
168         Mockito.doNothing().when(commitExecutor.delegate).shutdown();
169         Mockito.doReturn(Collections.emptyList()).when(commitExecutor.delegate).shutdownNow();
170         Mockito.doReturn("").when(commitExecutor.delegate).toString();
171         Mockito.doReturn(true).when(commitExecutor.delegate)
172                 .awaitTermination(Mockito.anyLong(), Mockito.any(TimeUnit.class));
173
174         DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
175         writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
176
177         writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
178     }
179
180     /**
181      * Tests a simple DataChangeListener notification after a write.
182      */
183     @Test
184     @SuppressWarnings("checkstyle:IllegalThrows")
185     public void testDataChangeListener() throws Throwable {
186
187         final NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
188
189         TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener();
190
191         domBroker.registerDataChangeListener(OPERATIONAL, TestModel.TEST_PATH, dcListener, DataChangeScope.BASE);
192
193         final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
194         assertNotNull(writeTx);
195
196         writeTx.put(OPERATIONAL, TestModel.TEST_PATH, testNode);
197
198         AtomicReference<Throwable> caughtEx = submitTxAsync(writeTx);
199
200         dcListener.waitForChange();
201
202         if (caughtEx.get() != null) {
203             throw caughtEx.get();
204         }
205
206         NormalizedNode<?, ?> actualNode = dcListener.capturedChange.getCreatedData().get(TestModel.TEST_PATH);
207         assertEquals("Created node", testNode, actualNode);
208     }
209
210     /**
211      * Tests a DataChangeListener that does an async submit of a write Tx in its onDataChanged method.
212      * This should succeed without deadlock.
213      */
214     @Test
215     @SuppressWarnings("checkstyle:IllegalThrows")
216     public void testDataChangeListenerDoingAsyncWriteTxSubmit() throws Throwable {
217
218         final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
219         final CountDownLatch commitCompletedLatch = new CountDownLatch(1);
220
221         TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
222             @Override
223             public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
224
225                 DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
226                 writeTx.put(OPERATIONAL, TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME));
227                 Futures.addCallback(writeTx.submit(), new FutureCallback<Void>() {
228                     @Override
229                     public void onSuccess(final Void result) {
230                         commitCompletedLatch.countDown();
231                     }
232
233                     @Override
234                     public void onFailure(final Throwable throwable) {
235                         caughtCommitEx.set(throwable);
236                         commitCompletedLatch.countDown();
237                     }
238                 });
239
240                 super.onDataChanged(change);
241             }
242         };
243
244         domBroker.registerDataChangeListener(OPERATIONAL, TestModel.TEST_PATH, dcListener, DataChangeScope.BASE);
245
246         final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
247         assertNotNull(writeTx);
248
249         writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
250
251         AtomicReference<Throwable> caughtEx = submitTxAsync(writeTx);
252
253         dcListener.waitForChange();
254
255         if (caughtEx.get() != null) {
256             throw caughtEx.get();
257         }
258
259         assertTrue("Commit Future was not invoked", commitCompletedLatch.await(5, TimeUnit.SECONDS));
260
261         if (caughtCommitEx.get() != null) {
262             throw caughtCommitEx.get();
263         }
264     }
265
266     /**
267      * Tests a DataChangeListener that does a blocking submit of a write Tx in its onDataChanged method.
268      * This should throw an exception and not deadlock.
269      */
270     @Test(expected = TransactionCommitDeadlockException.class)
271     @SuppressWarnings({"checkstyle:IllegalThrows", "checkstyle:IllegalCatch"})
272     public void testDataChangeListenerDoingBlockingWriteTxSubmit() throws Throwable {
273
274         final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
275
276         TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
277             @Override
278             public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
279                 DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
280                 writeTx.put(OPERATIONAL, TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME));
281                 try {
282                     writeTx.submit().get();
283                 } catch (ExecutionException e) {
284                     caughtCommitEx.set(e.getCause());
285                 } catch (Exception e) {
286                     caughtCommitEx.set(e);
287                 } finally {
288                     super.onDataChanged(change);
289                 }
290             }
291         };
292
293         domBroker.registerDataChangeListener(OPERATIONAL, TestModel.TEST_PATH, dcListener, DataChangeScope.BASE);
294
295         final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
296         assertNotNull(writeTx);
297
298         writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
299
300         AtomicReference<Throwable> caughtEx = submitTxAsync(writeTx);
301
302         dcListener.waitForChange();
303
304         if (caughtEx.get() != null) {
305             throw caughtEx.get();
306         }
307
308         if (caughtCommitEx.get() != null) {
309             throw caughtCommitEx.get();
310         }
311     }
312
313     @SuppressWarnings("checkstyle:IllegalCatch")
314     AtomicReference<Throwable> submitTxAsync(final DOMDataWriteTransaction writeTx) {
315         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
316         new Thread(() -> {
317             try {
318                 writeTx.submit();
319             } catch (Throwable e) {
320                 caughtEx.set(e);
321             }
322         }).start();
323
324         return caughtEx;
325     }
326
327     static class TestDOMDataChangeListener implements DOMDataChangeListener {
328
329         volatile AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> capturedChange;
330         private final CountDownLatch latch = new CountDownLatch(1);
331
332         @Override
333         public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
334             this.capturedChange = change;
335             latch.countDown();
336         }
337
338         void waitForChange() throws InterruptedException {
339             assertTrue("onDataChanged was not called", latch.await(5, TimeUnit.SECONDS));
340         }
341     }
342
343     static class CommitExecutorService extends ForwardingExecutorService {
344
345         ExecutorService delegate;
346
347         CommitExecutorService(final ExecutorService delegate) {
348             this.delegate = delegate;
349         }
350
351         @Override
352         protected ExecutorService delegate() {
353             return delegate;
354         }
355     }
356 }