Bug#1854 - Exit command in console causing OOM.
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / test / java / org / opendaylight / controller / md / sal / dom / broker / impl / DOMBrokerTest.java
1 package org.opendaylight.controller.md.sal.dom.broker.impl;
2
3 import static org.junit.Assert.assertFalse;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import static org.junit.Assert.assertEquals;
7 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
8 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
9
10 import java.util.Collections;
11 import java.util.concurrent.CountDownLatch;
12 import java.util.concurrent.ExecutionException;
13 import java.util.concurrent.ExecutorService;
14 import java.util.concurrent.Executors;
15 import java.util.concurrent.RejectedExecutionException;
16 import java.util.concurrent.TimeUnit;
17 import java.util.concurrent.atomic.AtomicReference;
18
19 import org.junit.After;
20 import org.junit.Before;
21 import org.junit.Test;
22 import org.mockito.Mockito;
23 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
24 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
25 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
26 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException;
27 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
28 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
29 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
30 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
31 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
32 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
33 import org.opendaylight.controller.md.sal.dom.store.impl.TestModel;
34 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
35 import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
36 import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
37 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
38 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
39 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
40 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
41
42 import com.google.common.base.Optional;
43 import com.google.common.collect.ImmutableMap;
44 import com.google.common.util.concurrent.ForwardingExecutorService;
45 import com.google.common.util.concurrent.FutureCallback;
46 import com.google.common.util.concurrent.Futures;
47 import com.google.common.util.concurrent.ListenableFuture;
48 import com.google.common.util.concurrent.ListeningExecutorService;
49 import com.google.common.util.concurrent.MoreExecutors;
50
51 public class DOMBrokerTest {
52
53     private SchemaContext schemaContext;
54     private DOMDataBrokerImpl domBroker;
55     private ListeningExecutorService executor;
56     private ExecutorService futureExecutor;
57     private CommitExecutorService commitExecutor;
58
59     @Before
60     public void setupStore() {
61
62         InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER",
63                 MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
64         InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG",
65                 MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
66         schemaContext = TestModel.createTestContext();
67
68         operStore.onGlobalContextUpdated(schemaContext);
69         configStore.onGlobalContextUpdated(schemaContext);
70
71         ImmutableMap<LogicalDatastoreType, DOMStore> stores = ImmutableMap.<LogicalDatastoreType, DOMStore> builder() //
72                 .put(CONFIGURATION, configStore) //
73                 .put(OPERATIONAL, operStore) //
74                 .build();
75
76         commitExecutor = new CommitExecutorService(Executors.newSingleThreadExecutor());
77         futureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(1, 5, "FCB");
78         executor = new DeadlockDetectingListeningExecutorService(commitExecutor,
79                 TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION, futureExecutor);
80         domBroker = new DOMDataBrokerImpl(stores, executor);
81     }
82
83     @After
84     public void tearDown() {
85         if( executor != null ) {
86             executor.shutdownNow();
87         }
88
89         if(futureExecutor != null) {
90             futureExecutor.shutdownNow();
91         }
92     }
93
94     @Test(timeout=10000)
95     public void testTransactionIsolation() throws InterruptedException, ExecutionException {
96
97         assertNotNull(domBroker);
98
99         DOMDataReadTransaction readTx = domBroker.newReadOnlyTransaction();
100         assertNotNull(readTx);
101
102         DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
103         assertNotNull(writeTx);
104         /**
105          *
106          * Writes /test in writeTx
107          *
108          */
109         writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
110
111         /**
112          *
113          * Reads /test from writeTx Read should return container.
114          *
115          */
116         ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(OPERATIONAL,
117                 TestModel.TEST_PATH);
118         assertTrue(writeTxContainer.get().isPresent());
119
120         /**
121          *
122          * Reads /test from readTx Read should return Absent.
123          *
124          */
125         ListenableFuture<Optional<NormalizedNode<?, ?>>> readTxContainer = readTx
126                 .read(OPERATIONAL, TestModel.TEST_PATH);
127         assertFalse(readTxContainer.get().isPresent());
128     }
129
130     @Test(timeout=10000)
131     public void testTransactionCommit() throws InterruptedException, ExecutionException {
132
133         DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
134         assertNotNull(writeTx);
135         /**
136          *
137          * Writes /test in writeTx
138          *
139          */
140         writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
141
142         /**
143          *
144          * Reads /test from writeTx Read should return container.
145          *
146          */
147         ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(OPERATIONAL,
148                 TestModel.TEST_PATH);
149         assertTrue(writeTxContainer.get().isPresent());
150
151         writeTx.submit().get();
152
153         Optional<NormalizedNode<?, ?>> afterCommitRead = domBroker.newReadOnlyTransaction()
154                 .read(OPERATIONAL, TestModel.TEST_PATH).get();
155         assertTrue(afterCommitRead.isPresent());
156     }
157
158     @Test(expected=TransactionCommitFailedException.class)
159     public void testRejectedCommit() throws Exception {
160
161         commitExecutor.delegate = Mockito.mock( ExecutorService.class );
162         Mockito.doThrow( new RejectedExecutionException( "mock" ) )
163             .when( commitExecutor.delegate ).execute( Mockito.any( Runnable.class ) );
164         Mockito.doNothing().when( commitExecutor.delegate ).shutdown();
165         Mockito.doReturn( Collections.emptyList() ).when( commitExecutor.delegate ).shutdownNow();
166         Mockito.doReturn( "" ).when( commitExecutor.delegate ).toString();
167         Mockito.doReturn( true ).when( commitExecutor.delegate )
168             .awaitTermination( Mockito.anyLong(), Mockito.any( TimeUnit.class ) );
169
170         DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
171         writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME) );
172
173         writeTx.submit().checkedGet( 5, TimeUnit.SECONDS );
174     }
175
176     /**
177      * Tests a simple DataChangeListener notification after a write.
178      */
179     @Test
180     public void testDataChangeListener() throws Throwable {
181
182         final NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode( TestModel.TEST_QNAME );
183
184         TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener();
185
186         domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
187                                               dcListener, DataChangeScope.BASE );
188
189         final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
190         assertNotNull( writeTx );
191
192         writeTx.put( OPERATIONAL, TestModel.TEST_PATH, testNode );
193
194         AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
195
196         dcListener.waitForChange();
197
198         if( caughtEx.get() != null ) {
199             throw caughtEx.get();
200         }
201
202         NormalizedNode<?, ?> actualNode = dcListener.change.getCreatedData().get( TestModel.TEST_PATH );
203         assertEquals( "Created node", testNode, actualNode );
204     }
205
206     /**
207      * Tests a DataChangeListener that does an async submit of a write Tx in its onDataChanged method.
208      * This should succeed without deadlock.
209      */
210     @Test
211     public void testDataChangeListenerDoingAsyncWriteTxSubmit() throws Throwable {
212
213         final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
214         final CountDownLatch commitCompletedLatch = new CountDownLatch( 1 );
215
216         TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
217             @Override
218             public void onDataChanged( AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
219
220                 DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
221                 writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
222                              ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
223                 Futures.addCallback( writeTx.submit(), new FutureCallback<Void>() {
224                     @Override
225                     public void onSuccess( Void result ) {
226                         commitCompletedLatch.countDown();
227                     }
228
229                     @Override
230                     public void onFailure( Throwable t ) {
231                         caughtCommitEx.set( t );
232                         commitCompletedLatch.countDown();
233                     }
234                 } );
235
236                 super.onDataChanged( change );
237             }
238         };
239
240         domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
241                                               dcListener, DataChangeScope.BASE );
242
243         final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
244         assertNotNull( writeTx );
245
246         writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
247
248         AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
249
250         dcListener.waitForChange();
251
252         if( caughtEx.get() != null ) {
253             throw caughtEx.get();
254         }
255
256         assertTrue( "Commit Future was not invoked", commitCompletedLatch.await( 5, TimeUnit.SECONDS ) );
257
258         if( caughtCommitEx.get() != null ) {
259             throw caughtCommitEx.get();
260         }
261     }
262
263     /**
264      * Tests a DataChangeListener that does a blocking submit of a write Tx in its onDataChanged method.
265      * This should throw an exception and not deadlock.
266      */
267     @Test(expected=TransactionCommitDeadlockException.class)
268     public void testDataChangeListenerDoingBlockingWriteTxSubmit() throws Throwable {
269
270         final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
271
272         TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
273             @Override
274             public void onDataChanged( AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
275                 DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
276                 writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
277                              ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
278                 try {
279                     writeTx.submit().get();
280                 } catch( ExecutionException e ) {
281                     caughtCommitEx.set( e.getCause() );
282                 } catch( Exception e ) {
283                     caughtCommitEx.set( e );
284                 }
285                 finally {
286                     super.onDataChanged( change );
287                 }
288             }
289         };
290
291         domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
292                                               dcListener, DataChangeScope.BASE );
293
294         final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
295         assertNotNull( writeTx );
296
297         writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
298
299         AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
300
301         dcListener.waitForChange();
302
303         if( caughtEx.get() != null ) {
304             throw caughtEx.get();
305         }
306
307         if( caughtCommitEx.get() != null ) {
308             throw caughtCommitEx.get();
309         }
310     }
311
312     AtomicReference<Throwable> submitTxAsync( final DOMDataWriteTransaction writeTx ) {
313         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
314         new Thread() {
315             @Override
316             public void run() {
317
318                 try {
319                     writeTx.submit();
320                 } catch( Throwable e ) {
321                     caughtEx.set( e );
322                 }
323             }
324
325         }.start();
326
327         return caughtEx;
328     }
329
330     static class TestDOMDataChangeListener implements DOMDataChangeListener {
331
332         volatile AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change;
333         private final CountDownLatch latch = new CountDownLatch( 1 );
334
335         @Override
336         public void onDataChanged( AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
337             this.change = change;
338             latch.countDown();
339         }
340
341         void waitForChange() throws InterruptedException {
342             assertTrue( "onDataChanged was not called", latch.await( 5, TimeUnit.SECONDS ) );
343         }
344     }
345
346     static class CommitExecutorService extends ForwardingExecutorService {
347
348         ExecutorService delegate;
349
350         public CommitExecutorService( ExecutorService delegate ) {
351             this.delegate = delegate;
352         }
353
354         @Override
355         protected ExecutorService delegate() {
356             return delegate;
357         }
358     }
359 }