1 package org.opendaylight.controller.md.sal.dom.broker.impl;
3 import static org.junit.Assert.assertFalse;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import static org.junit.Assert.assertEquals;
7 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
8 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
10 import java.util.Collections;
11 import java.util.concurrent.CountDownLatch;
12 import java.util.concurrent.ExecutionException;
13 import java.util.concurrent.ExecutorService;
14 import java.util.concurrent.Executors;
15 import java.util.concurrent.RejectedExecutionException;
16 import java.util.concurrent.TimeUnit;
17 import java.util.concurrent.atomic.AtomicReference;
19 import org.junit.After;
20 import org.junit.Before;
21 import org.junit.Test;
22 import org.mockito.Mockito;
23 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
24 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
25 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
26 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException;
27 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
28 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
29 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
30 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
31 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
32 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
33 import org.opendaylight.controller.md.sal.dom.store.impl.TestModel;
34 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
35 import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
36 import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
37 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
38 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
39 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
40 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
42 import com.google.common.base.Optional;
43 import com.google.common.collect.ImmutableMap;
44 import com.google.common.util.concurrent.ForwardingExecutorService;
45 import com.google.common.util.concurrent.FutureCallback;
46 import com.google.common.util.concurrent.Futures;
47 import com.google.common.util.concurrent.ListenableFuture;
48 import com.google.common.util.concurrent.ListeningExecutorService;
49 import com.google.common.util.concurrent.MoreExecutors;
51 public class DOMBrokerTest {
53 private SchemaContext schemaContext;
54 private DOMDataBrokerImpl domBroker;
55 private ListeningExecutorService executor;
56 private ExecutorService futureExecutor;
57 private CommitExecutorService commitExecutor;
60 public void setupStore() {
62 InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER",
63 MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
64 InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG",
65 MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
66 schemaContext = TestModel.createTestContext();
68 operStore.onGlobalContextUpdated(schemaContext);
69 configStore.onGlobalContextUpdated(schemaContext);
71 ImmutableMap<LogicalDatastoreType, DOMStore> stores = ImmutableMap.<LogicalDatastoreType, DOMStore> builder() //
72 .put(CONFIGURATION, configStore) //
73 .put(OPERATIONAL, operStore) //
76 commitExecutor = new CommitExecutorService(Executors.newSingleThreadExecutor());
77 futureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(1, 5, "FCB");
78 executor = new DeadlockDetectingListeningExecutorService(commitExecutor,
79 TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION, futureExecutor);
80 domBroker = new DOMDataBrokerImpl(stores, executor);
84 public void tearDown() {
85 if( executor != null ) {
86 executor.shutdownNow();
89 if(futureExecutor != null) {
90 futureExecutor.shutdownNow();
95 public void testTransactionIsolation() throws InterruptedException, ExecutionException {
97 assertNotNull(domBroker);
99 DOMDataReadTransaction readTx = domBroker.newReadOnlyTransaction();
100 assertNotNull(readTx);
102 DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
103 assertNotNull(writeTx);
106 * Writes /test in writeTx
109 writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
113 * Reads /test from writeTx Read should return container.
116 ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(OPERATIONAL,
117 TestModel.TEST_PATH);
118 assertTrue(writeTxContainer.get().isPresent());
122 * Reads /test from readTx Read should return Absent.
125 ListenableFuture<Optional<NormalizedNode<?, ?>>> readTxContainer = readTx
126 .read(OPERATIONAL, TestModel.TEST_PATH);
127 assertFalse(readTxContainer.get().isPresent());
131 public void testTransactionCommit() throws InterruptedException, ExecutionException {
133 DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
134 assertNotNull(writeTx);
137 * Writes /test in writeTx
140 writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
144 * Reads /test from writeTx Read should return container.
147 ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(OPERATIONAL,
148 TestModel.TEST_PATH);
149 assertTrue(writeTxContainer.get().isPresent());
151 writeTx.submit().get();
153 Optional<NormalizedNode<?, ?>> afterCommitRead = domBroker.newReadOnlyTransaction()
154 .read(OPERATIONAL, TestModel.TEST_PATH).get();
155 assertTrue(afterCommitRead.isPresent());
158 @Test(expected=TransactionCommitFailedException.class)
159 public void testRejectedCommit() throws Exception {
161 commitExecutor.delegate = Mockito.mock( ExecutorService.class );
162 Mockito.doThrow( new RejectedExecutionException( "mock" ) )
163 .when( commitExecutor.delegate ).execute( Mockito.any( Runnable.class ) );
164 Mockito.doNothing().when( commitExecutor.delegate ).shutdown();
165 Mockito.doReturn( Collections.emptyList() ).when( commitExecutor.delegate ).shutdownNow();
166 Mockito.doReturn( "" ).when( commitExecutor.delegate ).toString();
167 Mockito.doReturn( true ).when( commitExecutor.delegate )
168 .awaitTermination( Mockito.anyLong(), Mockito.any( TimeUnit.class ) );
170 DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
171 writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME) );
173 writeTx.submit().checkedGet( 5, TimeUnit.SECONDS );
177 * Tests a simple DataChangeListener notification after a write.
180 public void testDataChangeListener() throws Throwable {
182 final NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode( TestModel.TEST_QNAME );
184 TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener();
186 domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
187 dcListener, DataChangeScope.BASE );
189 final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
190 assertNotNull( writeTx );
192 writeTx.put( OPERATIONAL, TestModel.TEST_PATH, testNode );
194 AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
196 dcListener.waitForChange();
198 if( caughtEx.get() != null ) {
199 throw caughtEx.get();
202 NormalizedNode<?, ?> actualNode = dcListener.change.getCreatedData().get( TestModel.TEST_PATH );
203 assertEquals( "Created node", testNode, actualNode );
207 * Tests a DataChangeListener that does an async submit of a write Tx in its onDataChanged method.
208 * This should succeed without deadlock.
211 public void testDataChangeListenerDoingAsyncWriteTxSubmit() throws Throwable {
213 final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
214 final CountDownLatch commitCompletedLatch = new CountDownLatch( 1 );
216 TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
218 public void onDataChanged( AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
220 DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
221 writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
222 ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
223 Futures.addCallback( writeTx.submit(), new FutureCallback<Void>() {
225 public void onSuccess( Void result ) {
226 commitCompletedLatch.countDown();
230 public void onFailure( Throwable t ) {
231 caughtCommitEx.set( t );
232 commitCompletedLatch.countDown();
236 super.onDataChanged( change );
240 domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
241 dcListener, DataChangeScope.BASE );
243 final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
244 assertNotNull( writeTx );
246 writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
248 AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
250 dcListener.waitForChange();
252 if( caughtEx.get() != null ) {
253 throw caughtEx.get();
256 assertTrue( "Commit Future was not invoked", commitCompletedLatch.await( 5, TimeUnit.SECONDS ) );
258 if( caughtCommitEx.get() != null ) {
259 throw caughtCommitEx.get();
264 * Tests a DataChangeListener that does a blocking submit of a write Tx in its onDataChanged method.
265 * This should throw an exception and not deadlock.
267 @Test(expected=TransactionCommitDeadlockException.class)
268 public void testDataChangeListenerDoingBlockingWriteTxSubmit() throws Throwable {
270 final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
272 TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
274 public void onDataChanged( AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
275 DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
276 writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
277 ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
279 writeTx.submit().get();
280 } catch( ExecutionException e ) {
281 caughtCommitEx.set( e.getCause() );
282 } catch( Exception e ) {
283 caughtCommitEx.set( e );
286 super.onDataChanged( change );
291 domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
292 dcListener, DataChangeScope.BASE );
294 final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
295 assertNotNull( writeTx );
297 writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
299 AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
301 dcListener.waitForChange();
303 if( caughtEx.get() != null ) {
304 throw caughtEx.get();
307 if( caughtCommitEx.get() != null ) {
308 throw caughtCommitEx.get();
312 AtomicReference<Throwable> submitTxAsync( final DOMDataWriteTransaction writeTx ) {
313 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
320 } catch( Throwable e ) {
330 static class TestDOMDataChangeListener implements DOMDataChangeListener {
332 volatile AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change;
333 private final CountDownLatch latch = new CountDownLatch( 1 );
336 public void onDataChanged( AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
337 this.change = change;
341 void waitForChange() throws InterruptedException {
342 assertTrue( "onDataChanged was not called", latch.await( 5, TimeUnit.SECONDS ) );
346 static class CommitExecutorService extends ForwardingExecutorService {
348 ExecutorService delegate;
350 public CommitExecutorService( ExecutorService delegate ) {
351 this.delegate = delegate;
355 protected ExecutorService delegate() {