Removed ReadWriteTransaction concept from APIs.
[mdsal.git] / dom / mdsal-dom-broker / src / test / java / org / opendaylight / mdsal / dom / broker / test / DOMBrokerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.mdsal.dom.broker.test;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION;
16 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.OPERATIONAL;
17
18 import org.opendaylight.mdsal.dom.broker.test.util.TestModel;
19
20 import org.opendaylight.mdsal.dom.store.inmemory.InMemoryDOMDataStore;
21 import org.opendaylight.mdsal.dom.spi.store.DOMStore;
22 import org.opendaylight.mdsal.common.api.AsyncDataChangeEvent;
23 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
24 import org.opendaylight.mdsal.common.api.TransactionCommitDeadlockException;
25 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
26 import org.opendaylight.mdsal.common.api.AsyncDataBroker.DataChangeScope;
27 import org.opendaylight.mdsal.dom.broker.AbstractDOMDataBroker;
28 import org.opendaylight.mdsal.dom.broker.SerializedDOMDataBroker;
29 import org.opendaylight.mdsal.dom.api.DOMDataChangeListener;
30 import org.opendaylight.mdsal.dom.api.DOMDataReadTransaction;
31 import org.opendaylight.mdsal.dom.api.DOMDataWriteTransaction;
32 import com.google.common.base.Optional;
33 import com.google.common.collect.ImmutableMap;
34 import com.google.common.util.concurrent.ForwardingExecutorService;
35 import com.google.common.util.concurrent.FutureCallback;
36 import com.google.common.util.concurrent.Futures;
37 import com.google.common.util.concurrent.ListenableFuture;
38 import com.google.common.util.concurrent.ListeningExecutorService;
39 import com.google.common.util.concurrent.MoreExecutors;
40 import java.util.Collections;
41 import java.util.concurrent.CountDownLatch;
42 import java.util.concurrent.ExecutionException;
43 import java.util.concurrent.ExecutorService;
44 import java.util.concurrent.Executors;
45 import java.util.concurrent.RejectedExecutionException;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.atomic.AtomicReference;
48 import org.junit.After;
49 import org.junit.Before;
50 import org.junit.Test;
51 import org.mockito.Mockito;
52 import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
53 import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
54 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
55 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
56 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
57 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
58
59 public class DOMBrokerTest {
60
61     private SchemaContext schemaContext;
62     private AbstractDOMDataBroker domBroker;
63     private ListeningExecutorService executor;
64     private ExecutorService futureExecutor;
65     private CommitExecutorService commitExecutor;
66
67     @Before
68     public void setupStore() {
69
70         InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER",
71                 MoreExecutors.newDirectExecutorService());
72         InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG",
73                 MoreExecutors.newDirectExecutorService());
74         schemaContext = TestModel.createTestContext();
75
76         operStore.onGlobalContextUpdated(schemaContext);
77         configStore.onGlobalContextUpdated(schemaContext);
78
79         ImmutableMap<LogicalDatastoreType, DOMStore> stores = ImmutableMap.<LogicalDatastoreType, DOMStore> builder() //
80                 .put(CONFIGURATION, configStore) //
81                 .put(OPERATIONAL, operStore) //
82                 .build();
83
84         commitExecutor = new CommitExecutorService(Executors.newSingleThreadExecutor());
85         futureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(1, 5, "FCB");
86         executor = new DeadlockDetectingListeningExecutorService(commitExecutor,
87                 TransactionCommitDeadlockException.DEADLOCK_EXCEPTION_SUPPLIER, futureExecutor);
88         domBroker = new SerializedDOMDataBroker(stores, executor);
89     }
90
91     @After
92     public void tearDown() {
93         if( executor != null ) {
94             executor.shutdownNow();
95         }
96
97         if(futureExecutor != null) {
98             futureExecutor.shutdownNow();
99         }
100     }
101
102     @Test(timeout=10000)
103     public void testTransactionIsolation() throws InterruptedException, ExecutionException {
104
105         assertNotNull(domBroker);
106
107         DOMDataReadTransaction readTx = domBroker.newReadOnlyTransaction();
108         assertNotNull(readTx);
109
110         DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
111         assertNotNull(writeTx);
112         /**
113          *
114          * Writes /test in writeTx
115          *
116          */
117         writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
118
119
120         /**
121          *
122          * Reads /test from readTx Read should return Absent.
123          *
124          */
125         ListenableFuture<Optional<NormalizedNode<?, ?>>> readTxContainer = readTx
126                 .read(OPERATIONAL, TestModel.TEST_PATH);
127         assertFalse(readTxContainer.get().isPresent());
128     }
129
130     @Test(timeout=10000)
131     public void testTransactionCommit() throws InterruptedException, ExecutionException {
132
133         DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
134         assertNotNull(writeTx);
135         /**
136          *
137          * Writes /test in writeTx
138          *
139          */
140         writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
141
142         writeTx.submit().get();
143
144         Optional<NormalizedNode<?, ?>> afterCommitRead = domBroker.newReadOnlyTransaction()
145                 .read(OPERATIONAL, TestModel.TEST_PATH).get();
146         assertTrue(afterCommitRead.isPresent());
147     }
148
149     @Test(expected=TransactionCommitFailedException.class)
150     public void testRejectedCommit() throws Exception {
151
152         commitExecutor.delegate = Mockito.mock( ExecutorService.class );
153         Mockito.doThrow( new RejectedExecutionException( "mock" ) )
154             .when( commitExecutor.delegate ).execute( Mockito.any( Runnable.class ) );
155         Mockito.doNothing().when( commitExecutor.delegate ).shutdown();
156         Mockito.doReturn( Collections.emptyList() ).when( commitExecutor.delegate ).shutdownNow();
157         Mockito.doReturn( "" ).when( commitExecutor.delegate ).toString();
158         Mockito.doReturn( true ).when( commitExecutor.delegate )
159             .awaitTermination( Mockito.anyLong(), Mockito.any( TimeUnit.class ) );
160
161         DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
162         writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME) );
163
164         writeTx.submit().checkedGet( 5, TimeUnit.SECONDS );
165     }
166
167     /**
168      * Tests a simple DataChangeListener notification after a write.
169      */
170     @Test
171     public void testDataChangeListener() throws Throwable {
172
173         final NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode( TestModel.TEST_QNAME );
174
175         TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener();
176
177         domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
178                                               dcListener, DataChangeScope.BASE );
179
180         final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
181         assertNotNull( writeTx );
182
183         writeTx.put( OPERATIONAL, TestModel.TEST_PATH, testNode );
184
185         AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
186
187         dcListener.waitForChange();
188
189         if( caughtEx.get() != null ) {
190             throw caughtEx.get();
191         }
192
193         NormalizedNode<?, ?> actualNode = dcListener.change.getCreatedData().get( TestModel.TEST_PATH );
194         assertEquals( "Created node", testNode, actualNode );
195     }
196
197     /**
198      * Tests a DataChangeListener that does an async submit of a write Tx in its onDataChanged method.
199      * This should succeed without deadlock.
200      */
201     @Test
202     public void testDataChangeListenerDoingAsyncWriteTxSubmit() throws Throwable {
203
204         final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
205         final CountDownLatch commitCompletedLatch = new CountDownLatch( 1 );
206
207         TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
208             @Override
209             public void onDataChanged( final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
210
211                 DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
212                 writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
213                              ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
214                 Futures.addCallback( writeTx.submit(), new FutureCallback<Void>() {
215                     @Override
216                     public void onSuccess( final Void result ) {
217                         commitCompletedLatch.countDown();
218                     }
219
220                     @Override
221                     public void onFailure( final Throwable t ) {
222                         caughtCommitEx.set( t );
223                         commitCompletedLatch.countDown();
224                     }
225                 } );
226
227                 super.onDataChanged( change );
228             }
229         };
230
231         domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
232                                               dcListener, DataChangeScope.BASE );
233
234         final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
235         assertNotNull( writeTx );
236
237         writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
238
239         AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
240
241         dcListener.waitForChange();
242
243         if( caughtEx.get() != null ) {
244             throw caughtEx.get();
245         }
246
247         assertTrue( "Commit Future was not invoked", commitCompletedLatch.await( 5, TimeUnit.SECONDS ) );
248
249         if( caughtCommitEx.get() != null ) {
250             throw caughtCommitEx.get();
251         }
252     }
253
254     /**
255      * Tests a DataChangeListener that does a blocking submit of a write Tx in its onDataChanged method.
256      * This should throw an exception and not deadlock.
257      */
258     @Test(expected=TransactionCommitDeadlockException.class)
259     public void testDataChangeListenerDoingBlockingWriteTxSubmit() throws Throwable {
260
261         final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
262
263         TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
264             @Override
265             public void onDataChanged( final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
266                 DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
267                 writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
268                              ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
269                 try {
270                     writeTx.submit().get();
271                 } catch( ExecutionException e ) {
272                     caughtCommitEx.set( e.getCause() );
273                 } catch( Exception e ) {
274                     caughtCommitEx.set( e );
275                 }
276                 finally {
277                     super.onDataChanged( change );
278                 }
279             }
280         };
281
282         domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
283                                               dcListener, DataChangeScope.BASE );
284
285         final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
286         assertNotNull( writeTx );
287
288         writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
289
290         AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
291
292         dcListener.waitForChange();
293
294         if( caughtEx.get() != null ) {
295             throw caughtEx.get();
296         }
297
298         if( caughtCommitEx.get() != null ) {
299             throw caughtCommitEx.get();
300         }
301     }
302
303     AtomicReference<Throwable> submitTxAsync( final DOMDataWriteTransaction writeTx ) {
304         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
305         new Thread() {
306             @Override
307             public void run() {
308
309                 try {
310                     writeTx.submit();
311                 } catch( Throwable e ) {
312                     caughtEx.set( e );
313                 }
314             }
315
316         }.start();
317
318         return caughtEx;
319     }
320
321     static class TestDOMDataChangeListener implements DOMDataChangeListener {
322
323         volatile AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change;
324         private final CountDownLatch latch = new CountDownLatch( 1 );
325
326         @Override
327         public void onDataChanged( final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
328             this.change = change;
329             latch.countDown();
330         }
331
332         void waitForChange() throws InterruptedException {
333             assertTrue( "onDataChanged was not called", latch.await( 5, TimeUnit.SECONDS ) );
334         }
335     }
336
337     static class CommitExecutorService extends ForwardingExecutorService {
338
339         ExecutorService delegate;
340
341         public CommitExecutorService( final ExecutorService delegate ) {
342             this.delegate = delegate;
343         }
344
345         @Override
346         protected ExecutorService delegate() {
347             return delegate;
348         }
349     }
350 }