Renamed controller.md.sal.dom.store.impl -> mdsal.dom.store.inmemory
[mdsal.git] / dom / mdsal-dom-broker / src / test / java / org / opendaylight / controller / md / sal / dom / broker / impl / DOMBrokerTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.md.sal.dom.broker.impl;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION;
16 import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.OPERATIONAL;
17
18 import org.opendaylight.mdsal.dom.store.inmemory.InMemoryDOMDataStore;
19
20 import org.opendaylight.mdsal.dom.spi.store.DOMStore;
21 import org.opendaylight.mdsal.common.api.AsyncDataChangeEvent;
22 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
23 import org.opendaylight.mdsal.common.api.TransactionCommitDeadlockException;
24 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
25 import org.opendaylight.mdsal.common.api.AsyncDataBroker.DataChangeScope;
26 import org.opendaylight.mdsal.dom.broker.AbstractDOMDataBroker;
27 import org.opendaylight.mdsal.dom.broker.SerializedDOMDataBroker;
28 import org.opendaylight.mdsal.dom.api.DOMDataChangeListener;
29 import org.opendaylight.mdsal.dom.api.DOMDataReadTransaction;
30 import org.opendaylight.mdsal.dom.api.DOMDataReadWriteTransaction;
31 import org.opendaylight.mdsal.dom.api.DOMDataWriteTransaction;
32 import com.google.common.base.Optional;
33 import com.google.common.collect.ImmutableMap;
34 import com.google.common.util.concurrent.ForwardingExecutorService;
35 import com.google.common.util.concurrent.FutureCallback;
36 import com.google.common.util.concurrent.Futures;
37 import com.google.common.util.concurrent.ListenableFuture;
38 import com.google.common.util.concurrent.ListeningExecutorService;
39 import com.google.common.util.concurrent.MoreExecutors;
40 import java.util.Collections;
41 import java.util.concurrent.CountDownLatch;
42 import java.util.concurrent.ExecutionException;
43 import java.util.concurrent.ExecutorService;
44 import java.util.concurrent.Executors;
45 import java.util.concurrent.RejectedExecutionException;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.atomic.AtomicReference;
48 import org.junit.After;
49 import org.junit.Before;
50 import org.junit.Test;
51 import org.mockito.Mockito;
52 import org.opendaylight.controller.md.sal.dom.store.impl.TestModel;
53 import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
54 import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
55 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
56 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
57 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
58 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
59
60 public class DOMBrokerTest {
61
62     private SchemaContext schemaContext;
63     private AbstractDOMDataBroker domBroker;
64     private ListeningExecutorService executor;
65     private ExecutorService futureExecutor;
66     private CommitExecutorService commitExecutor;
67
68     @Before
69     public void setupStore() {
70
71         InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER",
72                 MoreExecutors.newDirectExecutorService());
73         InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG",
74                 MoreExecutors.newDirectExecutorService());
75         schemaContext = TestModel.createTestContext();
76
77         operStore.onGlobalContextUpdated(schemaContext);
78         configStore.onGlobalContextUpdated(schemaContext);
79
80         ImmutableMap<LogicalDatastoreType, DOMStore> stores = ImmutableMap.<LogicalDatastoreType, DOMStore> builder() //
81                 .put(CONFIGURATION, configStore) //
82                 .put(OPERATIONAL, operStore) //
83                 .build();
84
85         commitExecutor = new CommitExecutorService(Executors.newSingleThreadExecutor());
86         futureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(1, 5, "FCB");
87         executor = new DeadlockDetectingListeningExecutorService(commitExecutor,
88                 TransactionCommitDeadlockException.DEADLOCK_EXCEPTION_SUPPLIER, futureExecutor);
89         domBroker = new SerializedDOMDataBroker(stores, executor);
90     }
91
92     @After
93     public void tearDown() {
94         if( executor != null ) {
95             executor.shutdownNow();
96         }
97
98         if(futureExecutor != null) {
99             futureExecutor.shutdownNow();
100         }
101     }
102
103     @Test(timeout=10000)
104     public void testTransactionIsolation() throws InterruptedException, ExecutionException {
105
106         assertNotNull(domBroker);
107
108         DOMDataReadTransaction readTx = domBroker.newReadOnlyTransaction();
109         assertNotNull(readTx);
110
111         DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
112         assertNotNull(writeTx);
113         /**
114          *
115          * Writes /test in writeTx
116          *
117          */
118         writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
119
120         /**
121          *
122          * Reads /test from writeTx Read should return container.
123          *
124          */
125         ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(OPERATIONAL,
126                 TestModel.TEST_PATH);
127         assertTrue(writeTxContainer.get().isPresent());
128
129         /**
130          *
131          * Reads /test from readTx Read should return Absent.
132          *
133          */
134         ListenableFuture<Optional<NormalizedNode<?, ?>>> readTxContainer = readTx
135                 .read(OPERATIONAL, TestModel.TEST_PATH);
136         assertFalse(readTxContainer.get().isPresent());
137     }
138
139     @Test(timeout=10000)
140     public void testTransactionCommit() throws InterruptedException, ExecutionException {
141
142         DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
143         assertNotNull(writeTx);
144         /**
145          *
146          * Writes /test in writeTx
147          *
148          */
149         writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
150
151         /**
152          *
153          * Reads /test from writeTx Read should return container.
154          *
155          */
156         ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(OPERATIONAL,
157                 TestModel.TEST_PATH);
158         assertTrue(writeTxContainer.get().isPresent());
159
160         writeTx.submit().get();
161
162         Optional<NormalizedNode<?, ?>> afterCommitRead = domBroker.newReadOnlyTransaction()
163                 .read(OPERATIONAL, TestModel.TEST_PATH).get();
164         assertTrue(afterCommitRead.isPresent());
165     }
166
167     @Test(expected=TransactionCommitFailedException.class)
168     public void testRejectedCommit() throws Exception {
169
170         commitExecutor.delegate = Mockito.mock( ExecutorService.class );
171         Mockito.doThrow( new RejectedExecutionException( "mock" ) )
172             .when( commitExecutor.delegate ).execute( Mockito.any( Runnable.class ) );
173         Mockito.doNothing().when( commitExecutor.delegate ).shutdown();
174         Mockito.doReturn( Collections.emptyList() ).when( commitExecutor.delegate ).shutdownNow();
175         Mockito.doReturn( "" ).when( commitExecutor.delegate ).toString();
176         Mockito.doReturn( true ).when( commitExecutor.delegate )
177             .awaitTermination( Mockito.anyLong(), Mockito.any( TimeUnit.class ) );
178
179         DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
180         writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME) );
181
182         writeTx.submit().checkedGet( 5, TimeUnit.SECONDS );
183     }
184
185     /**
186      * Tests a simple DataChangeListener notification after a write.
187      */
188     @Test
189     public void testDataChangeListener() throws Throwable {
190
191         final NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode( TestModel.TEST_QNAME );
192
193         TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener();
194
195         domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
196                                               dcListener, DataChangeScope.BASE );
197
198         final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
199         assertNotNull( writeTx );
200
201         writeTx.put( OPERATIONAL, TestModel.TEST_PATH, testNode );
202
203         AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
204
205         dcListener.waitForChange();
206
207         if( caughtEx.get() != null ) {
208             throw caughtEx.get();
209         }
210
211         NormalizedNode<?, ?> actualNode = dcListener.change.getCreatedData().get( TestModel.TEST_PATH );
212         assertEquals( "Created node", testNode, actualNode );
213     }
214
215     /**
216      * Tests a DataChangeListener that does an async submit of a write Tx in its onDataChanged method.
217      * This should succeed without deadlock.
218      */
219     @Test
220     public void testDataChangeListenerDoingAsyncWriteTxSubmit() throws Throwable {
221
222         final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
223         final CountDownLatch commitCompletedLatch = new CountDownLatch( 1 );
224
225         TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
226             @Override
227             public void onDataChanged( final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
228
229                 DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
230                 writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
231                              ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
232                 Futures.addCallback( writeTx.submit(), new FutureCallback<Void>() {
233                     @Override
234                     public void onSuccess( final Void result ) {
235                         commitCompletedLatch.countDown();
236                     }
237
238                     @Override
239                     public void onFailure( final Throwable t ) {
240                         caughtCommitEx.set( t );
241                         commitCompletedLatch.countDown();
242                     }
243                 } );
244
245                 super.onDataChanged( change );
246             }
247         };
248
249         domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
250                                               dcListener, DataChangeScope.BASE );
251
252         final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
253         assertNotNull( writeTx );
254
255         writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
256
257         AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
258
259         dcListener.waitForChange();
260
261         if( caughtEx.get() != null ) {
262             throw caughtEx.get();
263         }
264
265         assertTrue( "Commit Future was not invoked", commitCompletedLatch.await( 5, TimeUnit.SECONDS ) );
266
267         if( caughtCommitEx.get() != null ) {
268             throw caughtCommitEx.get();
269         }
270     }
271
272     /**
273      * Tests a DataChangeListener that does a blocking submit of a write Tx in its onDataChanged method.
274      * This should throw an exception and not deadlock.
275      */
276     @Test(expected=TransactionCommitDeadlockException.class)
277     public void testDataChangeListenerDoingBlockingWriteTxSubmit() throws Throwable {
278
279         final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
280
281         TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
282             @Override
283             public void onDataChanged( final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
284                 DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
285                 writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
286                              ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
287                 try {
288                     writeTx.submit().get();
289                 } catch( ExecutionException e ) {
290                     caughtCommitEx.set( e.getCause() );
291                 } catch( Exception e ) {
292                     caughtCommitEx.set( e );
293                 }
294                 finally {
295                     super.onDataChanged( change );
296                 }
297             }
298         };
299
300         domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
301                                               dcListener, DataChangeScope.BASE );
302
303         final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
304         assertNotNull( writeTx );
305
306         writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
307
308         AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
309
310         dcListener.waitForChange();
311
312         if( caughtEx.get() != null ) {
313             throw caughtEx.get();
314         }
315
316         if( caughtCommitEx.get() != null ) {
317             throw caughtCommitEx.get();
318         }
319     }
320
321     AtomicReference<Throwable> submitTxAsync( final DOMDataWriteTransaction writeTx ) {
322         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
323         new Thread() {
324             @Override
325             public void run() {
326
327                 try {
328                     writeTx.submit();
329                 } catch( Throwable e ) {
330                     caughtEx.set( e );
331                 }
332             }
333
334         }.start();
335
336         return caughtEx;
337     }
338
339     static class TestDOMDataChangeListener implements DOMDataChangeListener {
340
341         volatile AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change;
342         private final CountDownLatch latch = new CountDownLatch( 1 );
343
344         @Override
345         public void onDataChanged( final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
346             this.change = change;
347             latch.countDown();
348         }
349
350         void waitForChange() throws InterruptedException {
351             assertTrue( "onDataChanged was not called", latch.await( 5, TimeUnit.SECONDS ) );
352         }
353     }
354
355     static class CommitExecutorService extends ForwardingExecutorService {
356
357         ExecutorService delegate;
358
359         public CommitExecutorService( final ExecutorService delegate ) {
360             this.delegate = delegate;
361         }
362
363         @Override
364         protected ExecutorService delegate() {
365             return delegate;
366         }
367     }
368 }