From: Tony Tkacik Date: Wed, 6 Aug 2014 17:05:10 +0000 (+0000) Subject: Merge "Bug 1386: Avoid commit deadlock" X-Git-Tag: release/helium~339 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=cc6c063be5f143cd601208ef26604d90a25bd1a5;hp=482b973992d39909a99d9241fb32c7cd562ef97c Merge "Bug 1386: Avoid commit deadlock" --- diff --git a/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/TransactionCommitDeadlockException.java b/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/TransactionCommitDeadlockException.java new file mode 100644 index 0000000000..60313bf109 --- /dev/null +++ b/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/TransactionCommitDeadlockException.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.md.sal.common.api.data; + +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.RpcResultBuilder; +import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; + +import com.google.common.base.Function; + +/** + * A type of TransactionCommitFailedException that indicates a situation that would result in a + * threading deadlock. This can occur if a caller that submits a write transaction tries to perform + * a blocking call via one of the get methods on the returned ListenableFuture. Callers + * should process the commit result asynchronously (via Futures#addCallback) to ensure deadlock + * won't occur. + * + * @author Thomas Pantelis + */ +public class TransactionCommitDeadlockException extends TransactionCommitFailedException { + + private static final long serialVersionUID = 1L; + + private static final String DEADLOCK_MESSAGE = + "An attempt to block on a ListenableFuture via a get method from a write " + + "transaction submit was detected that would result in deadlock. The commit " + + "result must be obtained asynchronously, e.g. via Futures#addCallback, to avoid deadlock."; + + public static Function DEADLOCK_EXECUTOR_FUNCTION = new Function() { + @Override + public Exception apply(Void notUsed) { + return new TransactionCommitDeadlockException( DEADLOCK_MESSAGE, + RpcResultBuilder.newError(ErrorType.APPLICATION, "lock-denied", DEADLOCK_MESSAGE)); + } + }; + + public TransactionCommitDeadlockException(String message, final RpcError... errors) { + super(message, errors); + } +} diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/DomInmemoryDataBrokerModule.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/DomInmemoryDataBrokerModule.java index 667c0fc282..22dad6af23 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/DomInmemoryDataBrokerModule.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/DomInmemoryDataBrokerModule.java @@ -10,9 +10,11 @@ package org.opendaylight.controller.config.yang.md.sal.dom.impl; import java.util.concurrent.Executors; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException; import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataBrokerImpl; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.sal.core.spi.data.DOMStore; +import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ListeningExecutorService; @@ -64,7 +66,9 @@ public final class DomInmemoryDataBrokerModule extends . builder().put(LogicalDatastoreType.OPERATIONAL, operStore) .put(LogicalDatastoreType.CONFIGURATION, configStore).build(); - DOMDataBrokerImpl newDataBroker = new DOMDataBrokerImpl(datastores, MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor())); + DOMDataBrokerImpl newDataBroker = new DOMDataBrokerImpl(datastores, + new DeadlockDetectingListeningExecutorService(Executors.newSingleThreadExecutor(), + TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION)); return newDataBroker; } diff --git a/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMBrokerTest.java b/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMBrokerTest.java index b006ca94e5..0bb16a39b9 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMBrokerTest.java +++ b/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMBrokerTest.java @@ -3,26 +3,40 @@ package org.opendaylight.controller.md.sal.dom.broker.impl; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION; import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.md.sal.dom.store.impl.TestModel; import org.opendaylight.controller.sal.core.spi.data.DOMStore; +import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -31,6 +45,7 @@ public class DOMBrokerTest { private SchemaContext schemaContext; private DOMDataBrokerImpl domBroker; + private ListeningExecutorService executor; @Before public void setupStore() { @@ -46,11 +61,19 @@ public class DOMBrokerTest { .put(OPERATIONAL, operStore) // .build(); - ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + executor = new DeadlockDetectingListeningExecutorService(Executors.newSingleThreadExecutor(), + TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION); domBroker = new DOMDataBrokerImpl(stores, executor); } - @Test + @After + public void tearDown() { + if( executor != null ) { + executor.shutdownNow(); + } + } + + @Test(timeout=10000) public void testTransactionIsolation() throws InterruptedException, ExecutionException { assertNotNull(domBroker); @@ -86,7 +109,7 @@ public class DOMBrokerTest { assertFalse(readTxContainer.get().isPresent()); } - @Test + @Test(timeout=10000) public void testTransactionCommit() throws InterruptedException, ExecutionException { DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction(); @@ -114,6 +137,173 @@ public class DOMBrokerTest { assertTrue(afterCommitRead.isPresent()); } + /** + * Tests a simple DataChangeListener notification after a write. + */ + @Test + public void testDataChangeListener() throws Throwable { + + final NormalizedNode testNode = ImmutableNodes.containerNode( TestModel.TEST_QNAME ); + + TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener(); + + domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH, + dcListener, DataChangeScope.BASE ); + + final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction(); + assertNotNull( writeTx ); + + writeTx.put( OPERATIONAL, TestModel.TEST_PATH, testNode ); + + AtomicReference caughtEx = submitTxAsync( writeTx ); + + dcListener.waitForChange(); + + if( caughtEx.get() != null ) { + throw caughtEx.get(); + } + + NormalizedNode actualNode = dcListener.change.getCreatedData().get( TestModel.TEST_PATH ); + assertEquals( "Created node", testNode, actualNode ); + } + + /** + * Tests a DataChangeListener that does an async submit of a write Tx in its onDataChanged method. + * This should succeed without deadlock. + */ + @Test + public void testDataChangeListenerDoingAsyncWriteTxSubmit() throws Throwable { + + final AtomicReference caughtCommitEx = new AtomicReference<>(); + final CountDownLatch commitCompletedLatch = new CountDownLatch( 1 ); + + TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() { + @Override + public void onDataChanged( AsyncDataChangeEvent> change ) { + + DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction(); + writeTx.put( OPERATIONAL, TestModel.TEST2_PATH, + ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) ); + Futures.addCallback( writeTx.submit(), new FutureCallback() { + @Override + public void onSuccess( Void result ) { + commitCompletedLatch.countDown(); + } + + @Override + public void onFailure( Throwable t ) { + caughtCommitEx.set( t ); + commitCompletedLatch.countDown(); + } + } ); + + super.onDataChanged( change ); + } + }; + + domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH, + dcListener, DataChangeScope.BASE ); + + final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction(); + assertNotNull( writeTx ); + + writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) ); + + AtomicReference caughtEx = submitTxAsync( writeTx ); + + dcListener.waitForChange(); + + if( caughtEx.get() != null ) { + throw caughtEx.get(); + } + + assertTrue( "Commit Future was not invoked", commitCompletedLatch.await( 5, TimeUnit.SECONDS ) ); + + if( caughtCommitEx.get() != null ) { + throw caughtCommitEx.get(); + } + } + + /** + * Tests a DataChangeListener that does a blocking submit of a write Tx in its onDataChanged method. + * This should throw an exception and not deadlock. + */ + @Test(expected=TransactionCommitDeadlockException.class) + public void testDataChangeListenerDoingBlockingWriteTxSubmit() throws Throwable { + + final AtomicReference caughtCommitEx = new AtomicReference<>(); + + TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() { + @Override + public void onDataChanged( AsyncDataChangeEvent> change ) { + DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction(); + writeTx.put( OPERATIONAL, TestModel.TEST2_PATH, + ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) ); + try { + writeTx.submit().get(); + } catch( ExecutionException e ) { + caughtCommitEx.set( e.getCause() ); + } catch( Exception e ) { + caughtCommitEx.set( e ); + } + finally { + super.onDataChanged( change ); + } + } + }; + + domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH, + dcListener, DataChangeScope.BASE ); + + final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction(); + assertNotNull( writeTx ); + + writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) ); + + AtomicReference caughtEx = submitTxAsync( writeTx ); + dcListener.waitForChange(); + if( caughtEx.get() != null ) { + throw caughtEx.get(); + } + + if( caughtCommitEx.get() != null ) { + throw caughtCommitEx.get(); + } + } + + AtomicReference submitTxAsync( final DOMDataWriteTransaction writeTx ) { + final AtomicReference caughtEx = new AtomicReference<>(); + new Thread() { + @Override + public void run() { + + try { + writeTx.submit(); + } catch( Throwable e ) { + caughtEx.set( e ); + } + } + + }.start(); + + return caughtEx; + } + + static class TestDOMDataChangeListener implements DOMDataChangeListener { + + volatile AsyncDataChangeEvent> change; + private final CountDownLatch latch = new CountDownLatch( 1 ); + + @Override + public void onDataChanged( AsyncDataChangeEvent> change ) { + this.change = change; + latch.countDown(); + } + + void waitForChange() throws InterruptedException { + assertTrue( "onDataChanged was not called", latch.await( 5, TimeUnit.SECONDS ) ); + } + } } diff --git a/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/TestModel.java b/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/TestModel.java index d5ba2a2b9a..09835ec5e3 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/TestModel.java +++ b/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/TestModel.java @@ -21,6 +21,8 @@ public class TestModel { public static final QName TEST_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13", "test"); + public static final QName TEST2_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13", + "test2"); public static final QName OUTER_LIST_QNAME = QName.create(TEST_QNAME, "outer-list"); public static final QName INNER_LIST_QNAME = QName.create(TEST_QNAME, "inner-list"); public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME, "outer-choice"); @@ -30,6 +32,7 @@ public class TestModel { private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang"; public static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.of(TEST_QNAME); + public static final YangInstanceIdentifier TEST2_PATH = YangInstanceIdentifier.of(TEST2_QNAME); public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).node(OUTER_LIST_QNAME).build(); public static final QName TWO_QNAME = QName.create(TEST_QNAME,"two"); public static final QName THREE_QNAME = QName.create(TEST_QNAME,"three"); diff --git a/opendaylight/md-sal/sal-dom-broker/src/test/resources/odl-datastore-test.yang b/opendaylight/md-sal/sal-dom-broker/src/test/resources/odl-datastore-test.yang index 17541fecab..5fbf470f09 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/test/resources/odl-datastore-test.yang +++ b/opendaylight/md-sal/sal-dom-broker/src/test/resources/odl-datastore-test.yang @@ -39,4 +39,7 @@ module odl-datastore-test { } } } + + container test2 { + } } \ No newline at end of file