Renamed controller.md.sal.dom.api to mdsal.dom.api
[mdsal.git] / dom / mdsal-dom-broker / src / test / java / org / opendaylight / controller / md / sal / dom / broker / impl / DOMBrokerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.md.sal.dom.broker.impl;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
16 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
17
18 import org.opendaylight.mdsal.dom.api.DOMDataChangeListener;
19 import org.opendaylight.mdsal.dom.api.DOMDataReadTransaction;
20 import org.opendaylight.mdsal.dom.api.DOMDataReadWriteTransaction;
21 import org.opendaylight.mdsal.dom.api.DOMDataWriteTransaction;
22
23 import com.google.common.base.Optional;
24 import com.google.common.collect.ImmutableMap;
25 import com.google.common.util.concurrent.ForwardingExecutorService;
26 import com.google.common.util.concurrent.FutureCallback;
27 import com.google.common.util.concurrent.Futures;
28 import com.google.common.util.concurrent.ListenableFuture;
29 import com.google.common.util.concurrent.ListeningExecutorService;
30 import com.google.common.util.concurrent.MoreExecutors;
31 import java.util.Collections;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.ExecutionException;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.RejectedExecutionException;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicReference;
39 import org.junit.After;
40 import org.junit.Before;
41 import org.junit.Test;
42 import org.mockito.Mockito;
43 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
44 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
45 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
46 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException;
47 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
48 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
49 import org.opendaylight.controller.md.sal.dom.store.impl.TestModel;
50 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
51 import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
52 import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
53 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
54 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
55 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
56 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
57
58 public class DOMBrokerTest {
59
60     private SchemaContext schemaContext;
61     private AbstractDOMDataBroker domBroker;
62     private ListeningExecutorService executor;
63     private ExecutorService futureExecutor;
64     private CommitExecutorService commitExecutor;
65
66     @Before
67     public void setupStore() {
68
69         InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER",
70                 MoreExecutors.newDirectExecutorService());
71         InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG",
72                 MoreExecutors.newDirectExecutorService());
73         schemaContext = TestModel.createTestContext();
74
75         operStore.onGlobalContextUpdated(schemaContext);
76         configStore.onGlobalContextUpdated(schemaContext);
77
78         ImmutableMap<LogicalDatastoreType, DOMStore> stores = ImmutableMap.<LogicalDatastoreType, DOMStore> builder() //
79                 .put(CONFIGURATION, configStore) //
80                 .put(OPERATIONAL, operStore) //
81                 .build();
82
83         commitExecutor = new CommitExecutorService(Executors.newSingleThreadExecutor());
84         futureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(1, 5, "FCB");
85         executor = new DeadlockDetectingListeningExecutorService(commitExecutor,
86                 TransactionCommitDeadlockException.DEADLOCK_EXCEPTION_SUPPLIER, futureExecutor);
87         domBroker = new SerializedDOMDataBroker(stores, executor);
88     }
89
90     @After
91     public void tearDown() {
92         if( executor != null ) {
93             executor.shutdownNow();
94         }
95
96         if(futureExecutor != null) {
97             futureExecutor.shutdownNow();
98         }
99     }
100
101     @Test(timeout=10000)
102     public void testTransactionIsolation() throws InterruptedException, ExecutionException {
103
104         assertNotNull(domBroker);
105
106         DOMDataReadTransaction readTx = domBroker.newReadOnlyTransaction();
107         assertNotNull(readTx);
108
109         DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
110         assertNotNull(writeTx);
111         /**
112          *
113          * Writes /test in writeTx
114          *
115          */
116         writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
117
118         /**
119          *
120          * Reads /test from writeTx Read should return container.
121          *
122          */
123         ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(OPERATIONAL,
124                 TestModel.TEST_PATH);
125         assertTrue(writeTxContainer.get().isPresent());
126
127         /**
128          *
129          * Reads /test from readTx Read should return Absent.
130          *
131          */
132         ListenableFuture<Optional<NormalizedNode<?, ?>>> readTxContainer = readTx
133                 .read(OPERATIONAL, TestModel.TEST_PATH);
134         assertFalse(readTxContainer.get().isPresent());
135     }
136
137     @Test(timeout=10000)
138     public void testTransactionCommit() throws InterruptedException, ExecutionException {
139
140         DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
141         assertNotNull(writeTx);
142         /**
143          *
144          * Writes /test in writeTx
145          *
146          */
147         writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
148
149         /**
150          *
151          * Reads /test from writeTx Read should return container.
152          *
153          */
154         ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(OPERATIONAL,
155                 TestModel.TEST_PATH);
156         assertTrue(writeTxContainer.get().isPresent());
157
158         writeTx.submit().get();
159
160         Optional<NormalizedNode<?, ?>> afterCommitRead = domBroker.newReadOnlyTransaction()
161                 .read(OPERATIONAL, TestModel.TEST_PATH).get();
162         assertTrue(afterCommitRead.isPresent());
163     }
164
165     @Test(expected=TransactionCommitFailedException.class)
166     public void testRejectedCommit() throws Exception {
167
168         commitExecutor.delegate = Mockito.mock( ExecutorService.class );
169         Mockito.doThrow( new RejectedExecutionException( "mock" ) )
170             .when( commitExecutor.delegate ).execute( Mockito.any( Runnable.class ) );
171         Mockito.doNothing().when( commitExecutor.delegate ).shutdown();
172         Mockito.doReturn( Collections.emptyList() ).when( commitExecutor.delegate ).shutdownNow();
173         Mockito.doReturn( "" ).when( commitExecutor.delegate ).toString();
174         Mockito.doReturn( true ).when( commitExecutor.delegate )
175             .awaitTermination( Mockito.anyLong(), Mockito.any( TimeUnit.class ) );
176
177         DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
178         writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME) );
179
180         writeTx.submit().checkedGet( 5, TimeUnit.SECONDS );
181     }
182
183     /**
184      * Tests a simple DataChangeListener notification after a write.
185      */
186     @Test
187     public void testDataChangeListener() throws Throwable {
188
189         final NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode( TestModel.TEST_QNAME );
190
191         TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener();
192
193         domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
194                                               dcListener, DataChangeScope.BASE );
195
196         final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
197         assertNotNull( writeTx );
198
199         writeTx.put( OPERATIONAL, TestModel.TEST_PATH, testNode );
200
201         AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
202
203         dcListener.waitForChange();
204
205         if( caughtEx.get() != null ) {
206             throw caughtEx.get();
207         }
208
209         NormalizedNode<?, ?> actualNode = dcListener.change.getCreatedData().get( TestModel.TEST_PATH );
210         assertEquals( "Created node", testNode, actualNode );
211     }
212
213     /**
214      * Tests a DataChangeListener that does an async submit of a write Tx in its onDataChanged method.
215      * This should succeed without deadlock.
216      */
217     @Test
218     public void testDataChangeListenerDoingAsyncWriteTxSubmit() throws Throwable {
219
220         final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
221         final CountDownLatch commitCompletedLatch = new CountDownLatch( 1 );
222
223         TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
224             @Override
225             public void onDataChanged( final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
226
227                 DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
228                 writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
229                              ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
230                 Futures.addCallback( writeTx.submit(), new FutureCallback<Void>() {
231                     @Override
232                     public void onSuccess( final Void result ) {
233                         commitCompletedLatch.countDown();
234                     }
235
236                     @Override
237                     public void onFailure( final Throwable t ) {
238                         caughtCommitEx.set( t );
239                         commitCompletedLatch.countDown();
240                     }
241                 } );
242
243                 super.onDataChanged( change );
244             }
245         };
246
247         domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
248                                               dcListener, DataChangeScope.BASE );
249
250         final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
251         assertNotNull( writeTx );
252
253         writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
254
255         AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
256
257         dcListener.waitForChange();
258
259         if( caughtEx.get() != null ) {
260             throw caughtEx.get();
261         }
262
263         assertTrue( "Commit Future was not invoked", commitCompletedLatch.await( 5, TimeUnit.SECONDS ) );
264
265         if( caughtCommitEx.get() != null ) {
266             throw caughtCommitEx.get();
267         }
268     }
269
270     /**
271      * Tests a DataChangeListener that does a blocking submit of a write Tx in its onDataChanged method.
272      * This should throw an exception and not deadlock.
273      */
274     @Test(expected=TransactionCommitDeadlockException.class)
275     public void testDataChangeListenerDoingBlockingWriteTxSubmit() throws Throwable {
276
277         final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
278
279         TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
280             @Override
281             public void onDataChanged( final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
282                 DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
283                 writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
284                              ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
285                 try {
286                     writeTx.submit().get();
287                 } catch( ExecutionException e ) {
288                     caughtCommitEx.set( e.getCause() );
289                 } catch( Exception e ) {
290                     caughtCommitEx.set( e );
291                 }
292                 finally {
293                     super.onDataChanged( change );
294                 }
295             }
296         };
297
298         domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
299                                               dcListener, DataChangeScope.BASE );
300
301         final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
302         assertNotNull( writeTx );
303
304         writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
305
306         AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
307
308         dcListener.waitForChange();
309
310         if( caughtEx.get() != null ) {
311             throw caughtEx.get();
312         }
313
314         if( caughtCommitEx.get() != null ) {
315             throw caughtCommitEx.get();
316         }
317     }
318
319     AtomicReference<Throwable> submitTxAsync( final DOMDataWriteTransaction writeTx ) {
320         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
321         new Thread() {
322             @Override
323             public void run() {
324
325                 try {
326                     writeTx.submit();
327                 } catch( Throwable e ) {
328                     caughtEx.set( e );
329                 }
330             }
331
332         }.start();
333
334         return caughtEx;
335     }
336
337     static class TestDOMDataChangeListener implements DOMDataChangeListener {
338
339         volatile AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change;
340         private final CountDownLatch latch = new CountDownLatch( 1 );
341
342         @Override
343         public void onDataChanged( final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
344             this.change = change;
345             latch.countDown();
346         }
347
348         void waitForChange() throws InterruptedException {
349             assertTrue( "onDataChanged was not called", latch.await( 5, TimeUnit.SECONDS ) );
350         }
351     }
352
353     static class CommitExecutorService extends ForwardingExecutorService {
354
355         ExecutorService delegate;
356
357         public CommitExecutorService( final ExecutorService delegate ) {
358             this.delegate = delegate;
359         }
360
361         @Override
362         protected ExecutorService delegate() {
363             return delegate;
364         }
365     }
366 }