Merge "Neutron LBaaS v2.0 API support"
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / test / java / org / opendaylight / controller / md / sal / dom / broker / impl / DOMBrokerTest.java
1 package org.opendaylight.controller.md.sal.dom.broker.impl;
2
3 import static org.junit.Assert.assertFalse;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import static org.junit.Assert.assertEquals;
7 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
8 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
9
10 import java.util.concurrent.CountDownLatch;
11 import java.util.concurrent.ExecutionException;
12 import java.util.concurrent.Executors;
13 import java.util.concurrent.TimeUnit;
14 import java.util.concurrent.atomic.AtomicReference;
15
16 import org.junit.After;
17 import org.junit.Before;
18 import org.junit.Test;
19 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
20 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
21 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
22 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException;
23 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
24 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
25 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
26 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
27 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
28 import org.opendaylight.controller.md.sal.dom.store.impl.TestModel;
29 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
30 import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
31 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
32 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
33 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
34 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
35
36 import com.google.common.base.Optional;
37 import com.google.common.collect.ImmutableMap;
38 import com.google.common.util.concurrent.FutureCallback;
39 import com.google.common.util.concurrent.Futures;
40 import com.google.common.util.concurrent.ListenableFuture;
41 import com.google.common.util.concurrent.ListeningExecutorService;
42 import com.google.common.util.concurrent.MoreExecutors;
43
44 public class DOMBrokerTest {
45
46     private SchemaContext schemaContext;
47     private DOMDataBrokerImpl domBroker;
48     private ListeningExecutorService executor;
49
50     @Before
51     public void setupStore() {
52         InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
53         InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
54         schemaContext = TestModel.createTestContext();
55
56         operStore.onGlobalContextUpdated(schemaContext);
57         configStore.onGlobalContextUpdated(schemaContext);
58
59         ImmutableMap<LogicalDatastoreType, DOMStore> stores = ImmutableMap.<LogicalDatastoreType, DOMStore> builder() //
60                 .put(CONFIGURATION, configStore) //
61                 .put(OPERATIONAL, operStore) //
62                 .build();
63
64         executor = new DeadlockDetectingListeningExecutorService(Executors.newSingleThreadExecutor(),
65                                           TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION);
66         domBroker = new DOMDataBrokerImpl(stores, executor);
67     }
68
69     @After
70     public void tearDown() {
71         if( executor != null ) {
72             executor.shutdownNow();
73         }
74     }
75
76     @Test(timeout=10000)
77     public void testTransactionIsolation() throws InterruptedException, ExecutionException {
78
79         assertNotNull(domBroker);
80
81         DOMDataReadTransaction readTx = domBroker.newReadOnlyTransaction();
82         assertNotNull(readTx);
83
84         DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
85         assertNotNull(writeTx);
86         /**
87          *
88          * Writes /test in writeTx
89          *
90          */
91         writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
92
93         /**
94          *
95          * Reads /test from writeTx Read should return container.
96          *
97          */
98         ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(OPERATIONAL,
99                 TestModel.TEST_PATH);
100         assertTrue(writeTxContainer.get().isPresent());
101
102         /**
103          *
104          * Reads /test from readTx Read should return Absent.
105          *
106          */
107         ListenableFuture<Optional<NormalizedNode<?, ?>>> readTxContainer = readTx
108                 .read(OPERATIONAL, TestModel.TEST_PATH);
109         assertFalse(readTxContainer.get().isPresent());
110     }
111
112     @Test(timeout=10000)
113     public void testTransactionCommit() throws InterruptedException, ExecutionException {
114
115         DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
116         assertNotNull(writeTx);
117         /**
118          *
119          * Writes /test in writeTx
120          *
121          */
122         writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
123
124         /**
125          *
126          * Reads /test from writeTx Read should return container.
127          *
128          */
129         ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(OPERATIONAL,
130                 TestModel.TEST_PATH);
131         assertTrue(writeTxContainer.get().isPresent());
132
133         writeTx.submit().get();
134
135         Optional<NormalizedNode<?, ?>> afterCommitRead = domBroker.newReadOnlyTransaction()
136                 .read(OPERATIONAL, TestModel.TEST_PATH).get();
137         assertTrue(afterCommitRead.isPresent());
138     }
139
140     /**
141      * Tests a simple DataChangeListener notification after a write.
142      */
143     @Test
144     public void testDataChangeListener() throws Throwable {
145
146         final NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode( TestModel.TEST_QNAME );
147
148         TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener();
149
150         domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
151                                               dcListener, DataChangeScope.BASE );
152
153         final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
154         assertNotNull( writeTx );
155
156         writeTx.put( OPERATIONAL, TestModel.TEST_PATH, testNode );
157
158         AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
159
160         dcListener.waitForChange();
161
162         if( caughtEx.get() != null ) {
163             throw caughtEx.get();
164         }
165
166         NormalizedNode<?, ?> actualNode = dcListener.change.getCreatedData().get( TestModel.TEST_PATH );
167         assertEquals( "Created node", testNode, actualNode );
168     }
169
170     /**
171      * Tests a DataChangeListener that does an async submit of a write Tx in its onDataChanged method.
172      * This should succeed without deadlock.
173      */
174     @Test
175     public void testDataChangeListenerDoingAsyncWriteTxSubmit() throws Throwable {
176
177         final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
178         final CountDownLatch commitCompletedLatch = new CountDownLatch( 1 );
179
180         TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
181             @Override
182             public void onDataChanged( AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
183
184                 DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
185                 writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
186                              ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
187                 Futures.addCallback( writeTx.submit(), new FutureCallback<Void>() {
188                     @Override
189                     public void onSuccess( Void result ) {
190                         commitCompletedLatch.countDown();
191                     }
192
193                     @Override
194                     public void onFailure( Throwable t ) {
195                         caughtCommitEx.set( t );
196                         commitCompletedLatch.countDown();
197                     }
198                 } );
199
200                 super.onDataChanged( change );
201             }
202         };
203
204         domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
205                                               dcListener, DataChangeScope.BASE );
206
207         final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
208         assertNotNull( writeTx );
209
210         writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
211
212         AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
213
214         dcListener.waitForChange();
215
216         if( caughtEx.get() != null ) {
217             throw caughtEx.get();
218         }
219
220         assertTrue( "Commit Future was not invoked", commitCompletedLatch.await( 5, TimeUnit.SECONDS ) );
221
222         if( caughtCommitEx.get() != null ) {
223             throw caughtCommitEx.get();
224         }
225     }
226
227     /**
228      * Tests a DataChangeListener that does a blocking submit of a write Tx in its onDataChanged method.
229      * This should throw an exception and not deadlock.
230      */
231     @Test(expected=TransactionCommitDeadlockException.class)
232     public void testDataChangeListenerDoingBlockingWriteTxSubmit() throws Throwable {
233
234         final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
235
236         TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
237             @Override
238             public void onDataChanged( AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
239                 DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
240                 writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
241                              ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
242                 try {
243                     writeTx.submit().get();
244                 } catch( ExecutionException e ) {
245                     caughtCommitEx.set( e.getCause() );
246                 } catch( Exception e ) {
247                     caughtCommitEx.set( e );
248                 }
249                 finally {
250                     super.onDataChanged( change );
251                 }
252             }
253         };
254
255         domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
256                                               dcListener, DataChangeScope.BASE );
257
258         final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
259         assertNotNull( writeTx );
260
261         writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
262
263         AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
264
265         dcListener.waitForChange();
266
267         if( caughtEx.get() != null ) {
268             throw caughtEx.get();
269         }
270
271         if( caughtCommitEx.get() != null ) {
272             throw caughtCommitEx.get();
273         }
274     }
275
276     AtomicReference<Throwable> submitTxAsync( final DOMDataWriteTransaction writeTx ) {
277         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
278         new Thread() {
279             @Override
280             public void run() {
281
282                 try {
283                     writeTx.submit();
284                 } catch( Throwable e ) {
285                     caughtEx.set( e );
286                 }
287             }
288
289         }.start();
290
291         return caughtEx;
292     }
293
294     static class TestDOMDataChangeListener implements DOMDataChangeListener {
295
296         volatile AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change;
297         private final CountDownLatch latch = new CountDownLatch( 1 );
298
299         @Override
300         public void onDataChanged( AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
301             this.change = change;
302             latch.countDown();
303         }
304
305         void waitForChange() throws InterruptedException {
306             assertTrue( "onDataChanged was not called", latch.await( 5, TimeUnit.SECONDS ) );
307         }
308     }
309 }