1 package org.opendaylight.controller.md.sal.dom.broker.impl;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertTrue;
7 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
8 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
9 import com.google.common.base.Optional;
10 import com.google.common.collect.ImmutableMap;
11 import com.google.common.util.concurrent.ForwardingExecutorService;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.ListeningExecutorService;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import java.util.Collections;
18 import java.util.concurrent.CountDownLatch;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.RejectedExecutionException;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.atomic.AtomicReference;
25 import org.junit.After;
26 import org.junit.Before;
27 import org.junit.Test;
28 import org.mockito.Mockito;
29 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
30 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
31 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException;
33 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
34 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
35 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
36 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
37 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
38 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
39 import org.opendaylight.controller.md.sal.dom.store.impl.TestModel;
40 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
41 import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
42 import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
43 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
44 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
45 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
46 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
48 public class DOMBrokerTest {
50 private SchemaContext schemaContext;
51 private DOMDataBrokerImpl domBroker;
52 private ListeningExecutorService executor;
53 private ExecutorService futureExecutor;
54 private CommitExecutorService commitExecutor;
57 public void setupStore() {
59 InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER",
60 MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
61 InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG",
62 MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
63 schemaContext = TestModel.createTestContext();
65 operStore.onGlobalContextUpdated(schemaContext);
66 configStore.onGlobalContextUpdated(schemaContext);
68 ImmutableMap<LogicalDatastoreType, DOMStore> stores = ImmutableMap.<LogicalDatastoreType, DOMStore> builder() //
69 .put(CONFIGURATION, configStore) //
70 .put(OPERATIONAL, operStore) //
73 commitExecutor = new CommitExecutorService(Executors.newSingleThreadExecutor());
74 futureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(1, 5, "FCB");
75 executor = new DeadlockDetectingListeningExecutorService(commitExecutor,
76 TransactionCommitDeadlockException.DEADLOCK_EXCEPTION_SUPPLIER, futureExecutor);
77 domBroker = new DOMDataBrokerImpl(stores, executor);
81 public void tearDown() {
82 if( executor != null ) {
83 executor.shutdownNow();
86 if(futureExecutor != null) {
87 futureExecutor.shutdownNow();
92 public void testTransactionIsolation() throws InterruptedException, ExecutionException {
94 assertNotNull(domBroker);
96 DOMDataReadTransaction readTx = domBroker.newReadOnlyTransaction();
97 assertNotNull(readTx);
99 DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
100 assertNotNull(writeTx);
103 * Writes /test in writeTx
106 writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
110 * Reads /test from writeTx Read should return container.
113 ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(OPERATIONAL,
114 TestModel.TEST_PATH);
115 assertTrue(writeTxContainer.get().isPresent());
119 * Reads /test from readTx Read should return Absent.
122 ListenableFuture<Optional<NormalizedNode<?, ?>>> readTxContainer = readTx
123 .read(OPERATIONAL, TestModel.TEST_PATH);
124 assertFalse(readTxContainer.get().isPresent());
128 public void testTransactionCommit() throws InterruptedException, ExecutionException {
130 DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
131 assertNotNull(writeTx);
134 * Writes /test in writeTx
137 writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
141 * Reads /test from writeTx Read should return container.
144 ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(OPERATIONAL,
145 TestModel.TEST_PATH);
146 assertTrue(writeTxContainer.get().isPresent());
148 writeTx.submit().get();
150 Optional<NormalizedNode<?, ?>> afterCommitRead = domBroker.newReadOnlyTransaction()
151 .read(OPERATIONAL, TestModel.TEST_PATH).get();
152 assertTrue(afterCommitRead.isPresent());
155 @Test(expected=TransactionCommitFailedException.class)
156 public void testRejectedCommit() throws Exception {
158 commitExecutor.delegate = Mockito.mock( ExecutorService.class );
159 Mockito.doThrow( new RejectedExecutionException( "mock" ) )
160 .when( commitExecutor.delegate ).execute( Mockito.any( Runnable.class ) );
161 Mockito.doNothing().when( commitExecutor.delegate ).shutdown();
162 Mockito.doReturn( Collections.emptyList() ).when( commitExecutor.delegate ).shutdownNow();
163 Mockito.doReturn( "" ).when( commitExecutor.delegate ).toString();
164 Mockito.doReturn( true ).when( commitExecutor.delegate )
165 .awaitTermination( Mockito.anyLong(), Mockito.any( TimeUnit.class ) );
167 DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
168 writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME) );
170 writeTx.submit().checkedGet( 5, TimeUnit.SECONDS );
174 * Tests a simple DataChangeListener notification after a write.
177 public void testDataChangeListener() throws Throwable {
179 final NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode( TestModel.TEST_QNAME );
181 TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener();
183 domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
184 dcListener, DataChangeScope.BASE );
186 final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
187 assertNotNull( writeTx );
189 writeTx.put( OPERATIONAL, TestModel.TEST_PATH, testNode );
191 AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
193 dcListener.waitForChange();
195 if( caughtEx.get() != null ) {
196 throw caughtEx.get();
199 NormalizedNode<?, ?> actualNode = dcListener.change.getCreatedData().get( TestModel.TEST_PATH );
200 assertEquals( "Created node", testNode, actualNode );
204 * Tests a DataChangeListener that does an async submit of a write Tx in its onDataChanged method.
205 * This should succeed without deadlock.
208 public void testDataChangeListenerDoingAsyncWriteTxSubmit() throws Throwable {
210 final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
211 final CountDownLatch commitCompletedLatch = new CountDownLatch( 1 );
213 TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
215 public void onDataChanged( final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
217 DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
218 writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
219 ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
220 Futures.addCallback( writeTx.submit(), new FutureCallback<Void>() {
222 public void onSuccess( final Void result ) {
223 commitCompletedLatch.countDown();
227 public void onFailure( final Throwable t ) {
228 caughtCommitEx.set( t );
229 commitCompletedLatch.countDown();
233 super.onDataChanged( change );
237 domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
238 dcListener, DataChangeScope.BASE );
240 final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
241 assertNotNull( writeTx );
243 writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
245 AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
247 dcListener.waitForChange();
249 if( caughtEx.get() != null ) {
250 throw caughtEx.get();
253 assertTrue( "Commit Future was not invoked", commitCompletedLatch.await( 5, TimeUnit.SECONDS ) );
255 if( caughtCommitEx.get() != null ) {
256 throw caughtCommitEx.get();
261 * Tests a DataChangeListener that does a blocking submit of a write Tx in its onDataChanged method.
262 * This should throw an exception and not deadlock.
264 @Test(expected=TransactionCommitDeadlockException.class)
265 public void testDataChangeListenerDoingBlockingWriteTxSubmit() throws Throwable {
267 final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
269 TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
271 public void onDataChanged( final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
272 DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
273 writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
274 ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
276 writeTx.submit().get();
277 } catch( ExecutionException e ) {
278 caughtCommitEx.set( e.getCause() );
279 } catch( Exception e ) {
280 caughtCommitEx.set( e );
283 super.onDataChanged( change );
288 domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
289 dcListener, DataChangeScope.BASE );
291 final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
292 assertNotNull( writeTx );
294 writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
296 AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
298 dcListener.waitForChange();
300 if( caughtEx.get() != null ) {
301 throw caughtEx.get();
304 if( caughtCommitEx.get() != null ) {
305 throw caughtCommitEx.get();
309 AtomicReference<Throwable> submitTxAsync( final DOMDataWriteTransaction writeTx ) {
310 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
317 } catch( Throwable e ) {
327 static class TestDOMDataChangeListener implements DOMDataChangeListener {
329 volatile AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change;
330 private final CountDownLatch latch = new CountDownLatch( 1 );
333 public void onDataChanged( final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
334 this.change = change;
338 void waitForChange() throws InterruptedException {
339 assertTrue( "onDataChanged was not called", latch.await( 5, TimeUnit.SECONDS ) );
343 static class CommitExecutorService extends ForwardingExecutorService {
345 ExecutorService delegate;
347 public CommitExecutorService( final ExecutorService delegate ) {
348 this.delegate = delegate;
352 protected ExecutorService delegate() {