Merge "Bug#1854 - Exit command in console causing OOM."
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / test / java / org / opendaylight / controller / md / sal / dom / broker / impl / DOMBrokerTest.java
1 package org.opendaylight.controller.md.sal.dom.broker.impl;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertTrue;
7 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
8 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
9 import com.google.common.base.Optional;
10 import com.google.common.collect.ImmutableMap;
11 import com.google.common.util.concurrent.ForwardingExecutorService;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.ListeningExecutorService;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import java.util.Collections;
18 import java.util.concurrent.CountDownLatch;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.RejectedExecutionException;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.atomic.AtomicReference;
25 import org.junit.After;
26 import org.junit.Before;
27 import org.junit.Test;
28 import org.mockito.Mockito;
29 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
30 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException;
33 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
34 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
35 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
36 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
37 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
38 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
39 import org.opendaylight.controller.md.sal.dom.store.impl.TestModel;
40 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
41 import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
42 import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
43 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
44 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
45 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
46 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
47
48 public class DOMBrokerTest {
49
50     private SchemaContext schemaContext;
51     private DOMDataBrokerImpl domBroker;
52     private ListeningExecutorService executor;
53     private ExecutorService futureExecutor;
54     private CommitExecutorService commitExecutor;
55
56     @Before
57     public void setupStore() {
58
59         InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER",
60                 MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
61         InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG",
62                 MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
63         schemaContext = TestModel.createTestContext();
64
65         operStore.onGlobalContextUpdated(schemaContext);
66         configStore.onGlobalContextUpdated(schemaContext);
67
68         ImmutableMap<LogicalDatastoreType, DOMStore> stores = ImmutableMap.<LogicalDatastoreType, DOMStore> builder() //
69                 .put(CONFIGURATION, configStore) //
70                 .put(OPERATIONAL, operStore) //
71                 .build();
72
73         commitExecutor = new CommitExecutorService(Executors.newSingleThreadExecutor());
74         futureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(1, 5, "FCB");
75         executor = new DeadlockDetectingListeningExecutorService(commitExecutor,
76                 TransactionCommitDeadlockException.DEADLOCK_EXCEPTION_SUPPLIER, futureExecutor);
77         domBroker = new DOMDataBrokerImpl(stores, executor);
78     }
79
80     @After
81     public void tearDown() {
82         if( executor != null ) {
83             executor.shutdownNow();
84         }
85
86         if(futureExecutor != null) {
87             futureExecutor.shutdownNow();
88         }
89     }
90
91     @Test(timeout=10000)
92     public void testTransactionIsolation() throws InterruptedException, ExecutionException {
93
94         assertNotNull(domBroker);
95
96         DOMDataReadTransaction readTx = domBroker.newReadOnlyTransaction();
97         assertNotNull(readTx);
98
99         DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
100         assertNotNull(writeTx);
101         /**
102          *
103          * Writes /test in writeTx
104          *
105          */
106         writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
107
108         /**
109          *
110          * Reads /test from writeTx Read should return container.
111          *
112          */
113         ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(OPERATIONAL,
114                 TestModel.TEST_PATH);
115         assertTrue(writeTxContainer.get().isPresent());
116
117         /**
118          *
119          * Reads /test from readTx Read should return Absent.
120          *
121          */
122         ListenableFuture<Optional<NormalizedNode<?, ?>>> readTxContainer = readTx
123                 .read(OPERATIONAL, TestModel.TEST_PATH);
124         assertFalse(readTxContainer.get().isPresent());
125     }
126
127     @Test(timeout=10000)
128     public void testTransactionCommit() throws InterruptedException, ExecutionException {
129
130         DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
131         assertNotNull(writeTx);
132         /**
133          *
134          * Writes /test in writeTx
135          *
136          */
137         writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
138
139         /**
140          *
141          * Reads /test from writeTx Read should return container.
142          *
143          */
144         ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(OPERATIONAL,
145                 TestModel.TEST_PATH);
146         assertTrue(writeTxContainer.get().isPresent());
147
148         writeTx.submit().get();
149
150         Optional<NormalizedNode<?, ?>> afterCommitRead = domBroker.newReadOnlyTransaction()
151                 .read(OPERATIONAL, TestModel.TEST_PATH).get();
152         assertTrue(afterCommitRead.isPresent());
153     }
154
155     @Test(expected=TransactionCommitFailedException.class)
156     public void testRejectedCommit() throws Exception {
157
158         commitExecutor.delegate = Mockito.mock( ExecutorService.class );
159         Mockito.doThrow( new RejectedExecutionException( "mock" ) )
160             .when( commitExecutor.delegate ).execute( Mockito.any( Runnable.class ) );
161         Mockito.doNothing().when( commitExecutor.delegate ).shutdown();
162         Mockito.doReturn( Collections.emptyList() ).when( commitExecutor.delegate ).shutdownNow();
163         Mockito.doReturn( "" ).when( commitExecutor.delegate ).toString();
164         Mockito.doReturn( true ).when( commitExecutor.delegate )
165             .awaitTermination( Mockito.anyLong(), Mockito.any( TimeUnit.class ) );
166
167         DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
168         writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME) );
169
170         writeTx.submit().checkedGet( 5, TimeUnit.SECONDS );
171     }
172
173     /**
174      * Tests a simple DataChangeListener notification after a write.
175      */
176     @Test
177     public void testDataChangeListener() throws Throwable {
178
179         final NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode( TestModel.TEST_QNAME );
180
181         TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener();
182
183         domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
184                                               dcListener, DataChangeScope.BASE );
185
186         final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
187         assertNotNull( writeTx );
188
189         writeTx.put( OPERATIONAL, TestModel.TEST_PATH, testNode );
190
191         AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
192
193         dcListener.waitForChange();
194
195         if( caughtEx.get() != null ) {
196             throw caughtEx.get();
197         }
198
199         NormalizedNode<?, ?> actualNode = dcListener.change.getCreatedData().get( TestModel.TEST_PATH );
200         assertEquals( "Created node", testNode, actualNode );
201     }
202
203     /**
204      * Tests a DataChangeListener that does an async submit of a write Tx in its onDataChanged method.
205      * This should succeed without deadlock.
206      */
207     @Test
208     public void testDataChangeListenerDoingAsyncWriteTxSubmit() throws Throwable {
209
210         final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
211         final CountDownLatch commitCompletedLatch = new CountDownLatch( 1 );
212
213         TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
214             @Override
215             public void onDataChanged( final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
216
217                 DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
218                 writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
219                              ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
220                 Futures.addCallback( writeTx.submit(), new FutureCallback<Void>() {
221                     @Override
222                     public void onSuccess( final Void result ) {
223                         commitCompletedLatch.countDown();
224                     }
225
226                     @Override
227                     public void onFailure( final Throwable t ) {
228                         caughtCommitEx.set( t );
229                         commitCompletedLatch.countDown();
230                     }
231                 } );
232
233                 super.onDataChanged( change );
234             }
235         };
236
237         domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
238                                               dcListener, DataChangeScope.BASE );
239
240         final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
241         assertNotNull( writeTx );
242
243         writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
244
245         AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
246
247         dcListener.waitForChange();
248
249         if( caughtEx.get() != null ) {
250             throw caughtEx.get();
251         }
252
253         assertTrue( "Commit Future was not invoked", commitCompletedLatch.await( 5, TimeUnit.SECONDS ) );
254
255         if( caughtCommitEx.get() != null ) {
256             throw caughtCommitEx.get();
257         }
258     }
259
260     /**
261      * Tests a DataChangeListener that does a blocking submit of a write Tx in its onDataChanged method.
262      * This should throw an exception and not deadlock.
263      */
264     @Test(expected=TransactionCommitDeadlockException.class)
265     public void testDataChangeListenerDoingBlockingWriteTxSubmit() throws Throwable {
266
267         final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
268
269         TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
270             @Override
271             public void onDataChanged( final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
272                 DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
273                 writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
274                              ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
275                 try {
276                     writeTx.submit().get();
277                 } catch( ExecutionException e ) {
278                     caughtCommitEx.set( e.getCause() );
279                 } catch( Exception e ) {
280                     caughtCommitEx.set( e );
281                 }
282                 finally {
283                     super.onDataChanged( change );
284                 }
285             }
286         };
287
288         domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
289                                               dcListener, DataChangeScope.BASE );
290
291         final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
292         assertNotNull( writeTx );
293
294         writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
295
296         AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
297
298         dcListener.waitForChange();
299
300         if( caughtEx.get() != null ) {
301             throw caughtEx.get();
302         }
303
304         if( caughtCommitEx.get() != null ) {
305             throw caughtCommitEx.get();
306         }
307     }
308
309     AtomicReference<Throwable> submitTxAsync( final DOMDataWriteTransaction writeTx ) {
310         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
311         new Thread() {
312             @Override
313             public void run() {
314
315                 try {
316                     writeTx.submit();
317                 } catch( Throwable e ) {
318                     caughtEx.set( e );
319                 }
320             }
321
322         }.start();
323
324         return caughtEx;
325     }
326
327     static class TestDOMDataChangeListener implements DOMDataChangeListener {
328
329         volatile AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change;
330         private final CountDownLatch latch = new CountDownLatch( 1 );
331
332         @Override
333         public void onDataChanged( final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
334             this.change = change;
335             latch.countDown();
336         }
337
338         void waitForChange() throws InterruptedException {
339             assertTrue( "onDataChanged was not called", latch.await( 5, TimeUnit.SECONDS ) );
340         }
341     }
342
343     static class CommitExecutorService extends ForwardingExecutorService {
344
345         ExecutorService delegate;
346
347         public CommitExecutorService( final ExecutorService delegate ) {
348             this.delegate = delegate;
349         }
350
351         @Override
352         protected ExecutorService delegate() {
353             return delegate;
354         }
355     }
356 }