X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-dom-broker%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fsal%2Fdom%2Fbroker%2Fimpl%2FDOMBrokerTest.java;h=cd8ac0998052f73c94a558443b3663583e82125b;hp=fec73d665b90763a30ac7cc400d2bf364ffdd358;hb=5c008222efa5c0af49cf8a52881a6299b1e249dc;hpb=c4940d6fa1f6928b7189afb2dc6964fb2f2cbae2 diff --git a/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMBrokerTest.java b/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMBrokerTest.java index fec73d665b..cd8ac09980 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMBrokerTest.java +++ b/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMBrokerTest.java @@ -1,41 +1,65 @@ package org.opendaylight.controller.md.sal.dom.broker.impl; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION; import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL; - +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ForwardingExecutorService; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; - +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.md.sal.dom.store.impl.TestModel; import org.opendaylight.controller.sal.core.spi.data.DOMStore; +import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService; +import org.opendaylight.yangtools.util.concurrent.SpecialExecutors; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; - public class DOMBrokerTest { private SchemaContext schemaContext; - private DOMDataBrokerImpl domBroker; + private AbstractDOMDataBroker domBroker; + private ListeningExecutorService executor; + private ExecutorService futureExecutor; + private CommitExecutorService commitExecutor; @Before public void setupStore() { - InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor()); - InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor()); + + InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", + MoreExecutors.newDirectExecutorService()); + InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", + MoreExecutors.newDirectExecutorService()); schemaContext = TestModel.createTestContext(); operStore.onGlobalContextUpdated(schemaContext); @@ -46,11 +70,25 @@ public class DOMBrokerTest { .put(OPERATIONAL, operStore) // .build(); - ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); - domBroker = new DOMDataBrokerImpl(stores, executor); + commitExecutor = new CommitExecutorService(Executors.newSingleThreadExecutor()); + futureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(1, 5, "FCB"); + executor = new DeadlockDetectingListeningExecutorService(commitExecutor, + TransactionCommitDeadlockException.DEADLOCK_EXCEPTION_SUPPLIER, futureExecutor); + domBroker = new SerializedDOMDataBroker(stores, executor); } - @Test + @After + public void tearDown() { + if( executor != null ) { + executor.shutdownNow(); + } + + if(futureExecutor != null) { + futureExecutor.shutdownNow(); + } + } + + @Test(timeout=10000) public void testTransactionIsolation() throws InterruptedException, ExecutionException { assertNotNull(domBroker); @@ -86,7 +124,7 @@ public class DOMBrokerTest { assertFalse(readTxContainer.get().isPresent()); } - @Test + @Test(timeout=10000) public void testTransactionCommit() throws InterruptedException, ExecutionException { DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction(); @@ -107,13 +145,212 @@ public class DOMBrokerTest { TestModel.TEST_PATH); assertTrue(writeTxContainer.get().isPresent()); - writeTx.commit().get(); + writeTx.submit().get(); Optional> afterCommitRead = domBroker.newReadOnlyTransaction() .read(OPERATIONAL, TestModel.TEST_PATH).get(); assertTrue(afterCommitRead.isPresent()); } + @Test(expected=TransactionCommitFailedException.class) + public void testRejectedCommit() throws Exception { + + commitExecutor.delegate = Mockito.mock( ExecutorService.class ); + Mockito.doThrow( new RejectedExecutionException( "mock" ) ) + .when( commitExecutor.delegate ).execute( Mockito.any( Runnable.class ) ); + Mockito.doNothing().when( commitExecutor.delegate ).shutdown(); + Mockito.doReturn( Collections.emptyList() ).when( commitExecutor.delegate ).shutdownNow(); + Mockito.doReturn( "" ).when( commitExecutor.delegate ).toString(); + Mockito.doReturn( true ).when( commitExecutor.delegate ) + .awaitTermination( Mockito.anyLong(), Mockito.any( TimeUnit.class ) ); + + DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction(); + writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME) ); + + writeTx.submit().checkedGet( 5, TimeUnit.SECONDS ); + } + + /** + * Tests a simple DataChangeListener notification after a write. + */ + @Test + public void testDataChangeListener() throws Throwable { + + final NormalizedNode testNode = ImmutableNodes.containerNode( TestModel.TEST_QNAME ); + + TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener(); + + domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH, + dcListener, DataChangeScope.BASE ); + + final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction(); + assertNotNull( writeTx ); + + writeTx.put( OPERATIONAL, TestModel.TEST_PATH, testNode ); + + AtomicReference caughtEx = submitTxAsync( writeTx ); + + dcListener.waitForChange(); + + if( caughtEx.get() != null ) { + throw caughtEx.get(); + } + + NormalizedNode actualNode = dcListener.change.getCreatedData().get( TestModel.TEST_PATH ); + assertEquals( "Created node", testNode, actualNode ); + } + + /** + * Tests a DataChangeListener that does an async submit of a write Tx in its onDataChanged method. + * This should succeed without deadlock. + */ + @Test + public void testDataChangeListenerDoingAsyncWriteTxSubmit() throws Throwable { + + final AtomicReference caughtCommitEx = new AtomicReference<>(); + final CountDownLatch commitCompletedLatch = new CountDownLatch( 1 ); + + TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() { + @Override + public void onDataChanged( final AsyncDataChangeEvent> change ) { + + DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction(); + writeTx.put( OPERATIONAL, TestModel.TEST2_PATH, + ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) ); + Futures.addCallback( writeTx.submit(), new FutureCallback() { + @Override + public void onSuccess( final Void result ) { + commitCompletedLatch.countDown(); + } + + @Override + public void onFailure( final Throwable t ) { + caughtCommitEx.set( t ); + commitCompletedLatch.countDown(); + } + } ); + + super.onDataChanged( change ); + } + }; + + domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH, + dcListener, DataChangeScope.BASE ); + + final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction(); + assertNotNull( writeTx ); + + writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) ); + + AtomicReference caughtEx = submitTxAsync( writeTx ); + + dcListener.waitForChange(); + + if( caughtEx.get() != null ) { + throw caughtEx.get(); + } + + assertTrue( "Commit Future was not invoked", commitCompletedLatch.await( 5, TimeUnit.SECONDS ) ); + + if( caughtCommitEx.get() != null ) { + throw caughtCommitEx.get(); + } + } + + /** + * Tests a DataChangeListener that does a blocking submit of a write Tx in its onDataChanged method. + * This should throw an exception and not deadlock. + */ + @Test(expected=TransactionCommitDeadlockException.class) + public void testDataChangeListenerDoingBlockingWriteTxSubmit() throws Throwable { + + final AtomicReference caughtCommitEx = new AtomicReference<>(); + + TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() { + @Override + public void onDataChanged( final AsyncDataChangeEvent> change ) { + DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction(); + writeTx.put( OPERATIONAL, TestModel.TEST2_PATH, + ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) ); + try { + writeTx.submit().get(); + } catch( ExecutionException e ) { + caughtCommitEx.set( e.getCause() ); + } catch( Exception e ) { + caughtCommitEx.set( e ); + } + finally { + super.onDataChanged( change ); + } + } + }; + + domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH, + dcListener, DataChangeScope.BASE ); + + final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction(); + assertNotNull( writeTx ); + + writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) ); + + AtomicReference caughtEx = submitTxAsync( writeTx ); + + dcListener.waitForChange(); + + if( caughtEx.get() != null ) { + throw caughtEx.get(); + } + + if( caughtCommitEx.get() != null ) { + throw caughtCommitEx.get(); + } + } + + AtomicReference submitTxAsync( final DOMDataWriteTransaction writeTx ) { + final AtomicReference caughtEx = new AtomicReference<>(); + new Thread() { + @Override + public void run() { + + try { + writeTx.submit(); + } catch( Throwable e ) { + caughtEx.set( e ); + } + } + }.start(); + + return caughtEx; + } + static class TestDOMDataChangeListener implements DOMDataChangeListener { + + volatile AsyncDataChangeEvent> change; + private final CountDownLatch latch = new CountDownLatch( 1 ); + + @Override + public void onDataChanged( final AsyncDataChangeEvent> change ) { + this.change = change; + latch.countDown(); + } + + void waitForChange() throws InterruptedException { + assertTrue( "onDataChanged was not called", latch.await( 5, TimeUnit.SECONDS ) ); + } + } + + static class CommitExecutorService extends ForwardingExecutorService { + + ExecutorService delegate; + + public CommitExecutorService( final ExecutorService delegate ) { + this.delegate = delegate; + } + + @Override + protected ExecutorService delegate() { + return delegate; + } + } }