X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-dom-broker%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fsal%2Fdom%2Fbroker%2Fimpl%2FDOMBrokerTest.java;h=0bb16a39b90f7eb513093b18faa20815061fad3c;hb=c74d5c2399e500fe3e690edc8cee497b1cb6f867;hp=fec73d665b90763a30ac7cc400d2bf364ffdd358;hpb=bcd020ecbeeacc97df1717a238d93bacd87bcbfc;p=controller.git diff --git a/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMBrokerTest.java b/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMBrokerTest.java index fec73d665b..0bb16a39b9 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMBrokerTest.java +++ b/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMBrokerTest.java @@ -3,26 +3,40 @@ package org.opendaylight.controller.md.sal.dom.broker.impl; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION; import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.md.sal.dom.store.impl.TestModel; import org.opendaylight.controller.sal.core.spi.data.DOMStore; +import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -31,6 +45,7 @@ public class DOMBrokerTest { private SchemaContext schemaContext; private DOMDataBrokerImpl domBroker; + private ListeningExecutorService executor; @Before public void setupStore() { @@ -46,11 +61,19 @@ public class DOMBrokerTest { .put(OPERATIONAL, operStore) // .build(); - ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + executor = new DeadlockDetectingListeningExecutorService(Executors.newSingleThreadExecutor(), + TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION); domBroker = new DOMDataBrokerImpl(stores, executor); } - @Test + @After + public void tearDown() { + if( executor != null ) { + executor.shutdownNow(); + } + } + + @Test(timeout=10000) public void testTransactionIsolation() throws InterruptedException, ExecutionException { assertNotNull(domBroker); @@ -86,7 +109,7 @@ public class DOMBrokerTest { assertFalse(readTxContainer.get().isPresent()); } - @Test + @Test(timeout=10000) public void testTransactionCommit() throws InterruptedException, ExecutionException { DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction(); @@ -107,13 +130,180 @@ public class DOMBrokerTest { TestModel.TEST_PATH); assertTrue(writeTxContainer.get().isPresent()); - writeTx.commit().get(); + writeTx.submit().get(); Optional> afterCommitRead = domBroker.newReadOnlyTransaction() .read(OPERATIONAL, TestModel.TEST_PATH).get(); assertTrue(afterCommitRead.isPresent()); } + /** + * Tests a simple DataChangeListener notification after a write. + */ + @Test + public void testDataChangeListener() throws Throwable { + + final NormalizedNode testNode = ImmutableNodes.containerNode( TestModel.TEST_QNAME ); + + TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener(); + + domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH, + dcListener, DataChangeScope.BASE ); + + final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction(); + assertNotNull( writeTx ); + + writeTx.put( OPERATIONAL, TestModel.TEST_PATH, testNode ); + + AtomicReference caughtEx = submitTxAsync( writeTx ); + + dcListener.waitForChange(); + + if( caughtEx.get() != null ) { + throw caughtEx.get(); + } + + NormalizedNode actualNode = dcListener.change.getCreatedData().get( TestModel.TEST_PATH ); + assertEquals( "Created node", testNode, actualNode ); + } + + /** + * Tests a DataChangeListener that does an async submit of a write Tx in its onDataChanged method. + * This should succeed without deadlock. + */ + @Test + public void testDataChangeListenerDoingAsyncWriteTxSubmit() throws Throwable { + + final AtomicReference caughtCommitEx = new AtomicReference<>(); + final CountDownLatch commitCompletedLatch = new CountDownLatch( 1 ); + + TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() { + @Override + public void onDataChanged( AsyncDataChangeEvent> change ) { + + DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction(); + writeTx.put( OPERATIONAL, TestModel.TEST2_PATH, + ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) ); + Futures.addCallback( writeTx.submit(), new FutureCallback() { + @Override + public void onSuccess( Void result ) { + commitCompletedLatch.countDown(); + } + + @Override + public void onFailure( Throwable t ) { + caughtCommitEx.set( t ); + commitCompletedLatch.countDown(); + } + } ); + + super.onDataChanged( change ); + } + }; + + domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH, + dcListener, DataChangeScope.BASE ); + + final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction(); + assertNotNull( writeTx ); + + writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) ); + + AtomicReference caughtEx = submitTxAsync( writeTx ); + + dcListener.waitForChange(); + + if( caughtEx.get() != null ) { + throw caughtEx.get(); + } + + assertTrue( "Commit Future was not invoked", commitCompletedLatch.await( 5, TimeUnit.SECONDS ) ); + + if( caughtCommitEx.get() != null ) { + throw caughtCommitEx.get(); + } + } + + /** + * Tests a DataChangeListener that does a blocking submit of a write Tx in its onDataChanged method. + * This should throw an exception and not deadlock. + */ + @Test(expected=TransactionCommitDeadlockException.class) + public void testDataChangeListenerDoingBlockingWriteTxSubmit() throws Throwable { + + final AtomicReference caughtCommitEx = new AtomicReference<>(); + + TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() { + @Override + public void onDataChanged( AsyncDataChangeEvent> change ) { + DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction(); + writeTx.put( OPERATIONAL, TestModel.TEST2_PATH, + ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) ); + try { + writeTx.submit().get(); + } catch( ExecutionException e ) { + caughtCommitEx.set( e.getCause() ); + } catch( Exception e ) { + caughtCommitEx.set( e ); + } + finally { + super.onDataChanged( change ); + } + } + }; + + domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH, + dcListener, DataChangeScope.BASE ); + + final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction(); + assertNotNull( writeTx ); + + writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) ); + + AtomicReference caughtEx = submitTxAsync( writeTx ); + dcListener.waitForChange(); + if( caughtEx.get() != null ) { + throw caughtEx.get(); + } + + if( caughtCommitEx.get() != null ) { + throw caughtCommitEx.get(); + } + } + + AtomicReference submitTxAsync( final DOMDataWriteTransaction writeTx ) { + final AtomicReference caughtEx = new AtomicReference<>(); + new Thread() { + @Override + public void run() { + + try { + writeTx.submit(); + } catch( Throwable e ) { + caughtEx.set( e ); + } + } + + }.start(); + + return caughtEx; + } + + static class TestDOMDataChangeListener implements DOMDataChangeListener { + + volatile AsyncDataChangeEvent> change; + private final CountDownLatch latch = new CountDownLatch( 1 ); + + @Override + public void onDataChanged( AsyncDataChangeEvent> change ) { + this.change = change; + latch.countDown(); + } + + void waitForChange() throws InterruptedException { + assertTrue( "onDataChanged was not called", latch.await( 5, TimeUnit.SECONDS ) ); + } + } }