be07a458b23a02ab6f6216d263792c24dd59d55a
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / test / java / org / opendaylight / controller / md / sal / dom / broker / impl / DOMBrokerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.md.sal.dom.broker.impl;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
16 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
17 import com.google.common.base.Optional;
18 import com.google.common.collect.ImmutableMap;
19 import com.google.common.util.concurrent.ForwardingExecutorService;
20 import com.google.common.util.concurrent.FutureCallback;
21 import com.google.common.util.concurrent.Futures;
22 import com.google.common.util.concurrent.ListenableFuture;
23 import com.google.common.util.concurrent.ListeningExecutorService;
24 import com.google.common.util.concurrent.MoreExecutors;
25 import java.util.Collections;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.RejectedExecutionException;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicReference;
33 import org.junit.After;
34 import org.junit.Before;
35 import org.junit.Test;
36 import org.mockito.Mockito;
37 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
38 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
39 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
40 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException;
41 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
42 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
43 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
44 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
45 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
46 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
47 import org.opendaylight.controller.md.sal.dom.store.impl.TestModel;
48 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
49 import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
50 import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
51 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
52 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
53 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
54 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
55
56 public class DOMBrokerTest {
57
58     private SchemaContext schemaContext;
59     private AbstractDOMDataBroker domBroker;
60     private ListeningExecutorService executor;
61     private ExecutorService futureExecutor;
62     private CommitExecutorService commitExecutor;
63
64     @Before
65     public void setupStore() {
66
67         InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER",
68                 MoreExecutors.newDirectExecutorService());
69         InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG",
70                 MoreExecutors.newDirectExecutorService());
71         schemaContext = TestModel.createTestContext();
72
73         operStore.onGlobalContextUpdated(schemaContext);
74         configStore.onGlobalContextUpdated(schemaContext);
75
76         ImmutableMap<LogicalDatastoreType, DOMStore> stores = ImmutableMap.<LogicalDatastoreType, DOMStore> builder() //
77                 .put(CONFIGURATION, configStore) //
78                 .put(OPERATIONAL, operStore) //
79                 .build();
80
81         commitExecutor = new CommitExecutorService(Executors.newSingleThreadExecutor());
82         futureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(1, 5, "FCB");
83         executor = new DeadlockDetectingListeningExecutorService(commitExecutor,
84                 TransactionCommitDeadlockException.DEADLOCK_EXCEPTION_SUPPLIER, futureExecutor);
85         domBroker = new SerializedDOMDataBroker(stores, executor);
86     }
87
88     @After
89     public void tearDown() {
90         if( executor != null ) {
91             executor.shutdownNow();
92         }
93
94         if(futureExecutor != null) {
95             futureExecutor.shutdownNow();
96         }
97     }
98
99     @Test(timeout=10000)
100     public void testTransactionIsolation() throws InterruptedException, ExecutionException {
101
102         assertNotNull(domBroker);
103
104         DOMDataReadTransaction readTx = domBroker.newReadOnlyTransaction();
105         assertNotNull(readTx);
106
107         DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
108         assertNotNull(writeTx);
109         /**
110          *
111          * Writes /test in writeTx
112          *
113          */
114         writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
115
116         /**
117          *
118          * Reads /test from writeTx Read should return container.
119          *
120          */
121         ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(OPERATIONAL,
122                 TestModel.TEST_PATH);
123         assertTrue(writeTxContainer.get().isPresent());
124
125         /**
126          *
127          * Reads /test from readTx Read should return Absent.
128          *
129          */
130         ListenableFuture<Optional<NormalizedNode<?, ?>>> readTxContainer = readTx
131                 .read(OPERATIONAL, TestModel.TEST_PATH);
132         assertFalse(readTxContainer.get().isPresent());
133     }
134
135     @Test(timeout=10000)
136     public void testTransactionCommit() throws InterruptedException, ExecutionException {
137
138         DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
139         assertNotNull(writeTx);
140         /**
141          *
142          * Writes /test in writeTx
143          *
144          */
145         writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
146
147         /**
148          *
149          * Reads /test from writeTx Read should return container.
150          *
151          */
152         ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(OPERATIONAL,
153                 TestModel.TEST_PATH);
154         assertTrue(writeTxContainer.get().isPresent());
155
156         writeTx.submit().get();
157
158         Optional<NormalizedNode<?, ?>> afterCommitRead = domBroker.newReadOnlyTransaction()
159                 .read(OPERATIONAL, TestModel.TEST_PATH).get();
160         assertTrue(afterCommitRead.isPresent());
161     }
162
163     @Test(expected=TransactionCommitFailedException.class)
164     public void testRejectedCommit() throws Exception {
165
166         commitExecutor.delegate = Mockito.mock( ExecutorService.class );
167         Mockito.doThrow( new RejectedExecutionException( "mock" ) )
168             .when( commitExecutor.delegate ).execute( Mockito.any( Runnable.class ) );
169         Mockito.doNothing().when( commitExecutor.delegate ).shutdown();
170         Mockito.doReturn( Collections.emptyList() ).when( commitExecutor.delegate ).shutdownNow();
171         Mockito.doReturn( "" ).when( commitExecutor.delegate ).toString();
172         Mockito.doReturn( true ).when( commitExecutor.delegate )
173             .awaitTermination( Mockito.anyLong(), Mockito.any( TimeUnit.class ) );
174
175         DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
176         writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME) );
177
178         writeTx.submit().checkedGet( 5, TimeUnit.SECONDS );
179     }
180
181     /**
182      * Tests a simple DataChangeListener notification after a write.
183      */
184     @Test
185     public void testDataChangeListener() throws Throwable {
186
187         final NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode( TestModel.TEST_QNAME );
188
189         TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener();
190
191         domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
192                                               dcListener, DataChangeScope.BASE );
193
194         final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
195         assertNotNull( writeTx );
196
197         writeTx.put( OPERATIONAL, TestModel.TEST_PATH, testNode );
198
199         AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
200
201         dcListener.waitForChange();
202
203         if( caughtEx.get() != null ) {
204             throw caughtEx.get();
205         }
206
207         NormalizedNode<?, ?> actualNode = dcListener.change.getCreatedData().get( TestModel.TEST_PATH );
208         assertEquals( "Created node", testNode, actualNode );
209     }
210
211     /**
212      * Tests a DataChangeListener that does an async submit of a write Tx in its onDataChanged method.
213      * This should succeed without deadlock.
214      */
215     @Test
216     public void testDataChangeListenerDoingAsyncWriteTxSubmit() throws Throwable {
217
218         final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
219         final CountDownLatch commitCompletedLatch = new CountDownLatch( 1 );
220
221         TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
222             @Override
223             public void onDataChanged( final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
224
225                 DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
226                 writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
227                              ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
228                 Futures.addCallback( writeTx.submit(), new FutureCallback<Void>() {
229                     @Override
230                     public void onSuccess( final Void result ) {
231                         commitCompletedLatch.countDown();
232                     }
233
234                     @Override
235                     public void onFailure( final Throwable t ) {
236                         caughtCommitEx.set( t );
237                         commitCompletedLatch.countDown();
238                     }
239                 } );
240
241                 super.onDataChanged( change );
242             }
243         };
244
245         domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
246                                               dcListener, DataChangeScope.BASE );
247
248         final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
249         assertNotNull( writeTx );
250
251         writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
252
253         AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
254
255         dcListener.waitForChange();
256
257         if( caughtEx.get() != null ) {
258             throw caughtEx.get();
259         }
260
261         assertTrue( "Commit Future was not invoked", commitCompletedLatch.await( 5, TimeUnit.SECONDS ) );
262
263         if( caughtCommitEx.get() != null ) {
264             throw caughtCommitEx.get();
265         }
266     }
267
268     /**
269      * Tests a DataChangeListener that does a blocking submit of a write Tx in its onDataChanged method.
270      * This should throw an exception and not deadlock.
271      */
272     @Test(expected=TransactionCommitDeadlockException.class)
273     public void testDataChangeListenerDoingBlockingWriteTxSubmit() throws Throwable {
274
275         final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
276
277         TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
278             @Override
279             public void onDataChanged( final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
280                 DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
281                 writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
282                              ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
283                 try {
284                     writeTx.submit().get();
285                 } catch( ExecutionException e ) {
286                     caughtCommitEx.set( e.getCause() );
287                 } catch( Exception e ) {
288                     caughtCommitEx.set( e );
289                 }
290                 finally {
291                     super.onDataChanged( change );
292                 }
293             }
294         };
295
296         domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
297                                               dcListener, DataChangeScope.BASE );
298
299         final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
300         assertNotNull( writeTx );
301
302         writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
303
304         AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
305
306         dcListener.waitForChange();
307
308         if( caughtEx.get() != null ) {
309             throw caughtEx.get();
310         }
311
312         if( caughtCommitEx.get() != null ) {
313             throw caughtCommitEx.get();
314         }
315     }
316
317     AtomicReference<Throwable> submitTxAsync( final DOMDataWriteTransaction writeTx ) {
318         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
319         new Thread(() -> {
320             try {
321                 writeTx.submit();
322             } catch (Throwable e) {
323                 caughtEx.set(e);
324             }
325         }).start();
326
327         return caughtEx;
328     }
329
330     static class TestDOMDataChangeListener implements DOMDataChangeListener {
331
332         volatile AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change;
333         private final CountDownLatch latch = new CountDownLatch( 1 );
334
335         @Override
336         public void onDataChanged( final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
337             this.change = change;
338             latch.countDown();
339         }
340
341         void waitForChange() throws InterruptedException {
342             assertTrue( "onDataChanged was not called", latch.await( 5, TimeUnit.SECONDS ) );
343         }
344     }
345
346     static class CommitExecutorService extends ForwardingExecutorService {
347
348         ExecutorService delegate;
349
350         public CommitExecutorService( final ExecutorService delegate ) {
351             this.delegate = delegate;
352         }
353
354         @Override
355         protected ExecutorService delegate() {
356             return delegate;
357         }
358     }
359 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.