Bug 1386: Avoid commit deadlock 10/9210/10
authortpantelis <tpanteli@brocade.com>
Mon, 14 Jul 2014 12:01:16 +0000 (08:01 -0400)
committertpantelis <tpanteli@brocade.com>
Wed, 6 Aug 2014 03:11:28 +0000 (23:11 -0400)
Changed the executor in DOMDataCommitCoordinatorImpl to a new
class DeadlockDetectingListeningExecutorService added to yangtools util.
This detects deadlock if another blocking commit is done by a client
notified from the single commit thread.

Change-Id: Icdc2b835fd336a1bbc37953828126b57253929c0
Signed-off-by: tpantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/TransactionCommitDeadlockException.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/DomInmemoryDataBrokerModule.java
opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMBrokerTest.java
opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/TestModel.java
opendaylight/md-sal/sal-dom-broker/src/test/resources/odl-datastore-test.yang

diff --git a/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/TransactionCommitDeadlockException.java b/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/data/TransactionCommitDeadlockException.java
new file mode 100644 (file)
index 0000000..60313bf
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.common.api.data;
+
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
+
+import com.google.common.base.Function;
+
+/**
+ * A type of TransactionCommitFailedException that indicates a situation that would result in a
+ * threading deadlock. This can occur if a caller that submits a write transaction tries to perform
+ * a blocking call via one of the <code>get</code> methods on the returned ListenableFuture. Callers
+ * should process the commit result asynchronously (via Futures#addCallback) to ensure deadlock
+ * won't occur.
+ *
+ * @author Thomas Pantelis
+ */
+public class TransactionCommitDeadlockException extends TransactionCommitFailedException {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String DEADLOCK_MESSAGE =
+            "An attempt to block on a ListenableFuture via a get method from a write " +
+            "transaction submit was detected that would result in deadlock. The commit " +
+            "result must be obtained asynchronously, e.g. via Futures#addCallback, to avoid deadlock.";
+
+    public static Function<Void, Exception> DEADLOCK_EXECUTOR_FUNCTION = new Function<Void, Exception>() {
+        @Override
+        public Exception apply(Void notUsed) {
+            return new TransactionCommitDeadlockException( DEADLOCK_MESSAGE,
+                    RpcResultBuilder.newError(ErrorType.APPLICATION, "lock-denied", DEADLOCK_MESSAGE));
+        }
+    };
+
+    public TransactionCommitDeadlockException(String message, final RpcError... errors) {
+        super(message, errors);
+    }
+}
index 667c0fc2826a100b2fc6ae77f4c0344a52d723f6..22dad6af23c1464b63f8a4aa25075551b64ef686 100644 (file)
@@ -10,9 +10,11 @@ package org.opendaylight.controller.config.yang.md.sal.dom.impl;
 import java.util.concurrent.Executors;
 
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException;
 import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataBrokerImpl;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
+import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -64,7 +66,9 @@ public final class DomInmemoryDataBrokerModule extends
                 .<LogicalDatastoreType, DOMStore> builder().put(LogicalDatastoreType.OPERATIONAL, operStore)
                 .put(LogicalDatastoreType.CONFIGURATION, configStore).build();
 
-        DOMDataBrokerImpl newDataBroker = new DOMDataBrokerImpl(datastores, MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()));
+        DOMDataBrokerImpl newDataBroker = new DOMDataBrokerImpl(datastores,
+                new DeadlockDetectingListeningExecutorService(Executors.newSingleThreadExecutor(),
+                                              TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION));
 
         return newDataBroker;
     }
index b006ca94e5d1387bfd84e7a76eccba3700756905..0bb16a39b90f7eb513093b18faa20815061fad3c 100644 (file)
@@ -3,26 +3,40 @@ package org.opendaylight.controller.md.sal.dom.broker.impl;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
 
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.md.sal.dom.store.impl.TestModel;
 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
+import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -31,6 +45,7 @@ public class DOMBrokerTest {
 
     private SchemaContext schemaContext;
     private DOMDataBrokerImpl domBroker;
+    private ListeningExecutorService executor;
 
     @Before
     public void setupStore() {
@@ -46,11 +61,19 @@ public class DOMBrokerTest {
                 .put(OPERATIONAL, operStore) //
                 .build();
 
-        ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+        executor = new DeadlockDetectingListeningExecutorService(Executors.newSingleThreadExecutor(),
+                                          TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION);
         domBroker = new DOMDataBrokerImpl(stores, executor);
     }
 
-    @Test
+    @After
+    public void tearDown() {
+        if( executor != null ) {
+            executor.shutdownNow();
+        }
+    }
+
+    @Test(timeout=10000)
     public void testTransactionIsolation() throws InterruptedException, ExecutionException {
 
         assertNotNull(domBroker);
@@ -86,7 +109,7 @@ public class DOMBrokerTest {
         assertFalse(readTxContainer.get().isPresent());
     }
 
-    @Test
+    @Test(timeout=10000)
     public void testTransactionCommit() throws InterruptedException, ExecutionException {
 
         DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
@@ -114,6 +137,173 @@ public class DOMBrokerTest {
         assertTrue(afterCommitRead.isPresent());
     }
 
+    /**
+     * Tests a simple DataChangeListener notification after a write.
+     */
+    @Test
+    public void testDataChangeListener() throws Throwable {
+
+        final NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode( TestModel.TEST_QNAME );
+
+        TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener();
+
+        domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
+                                              dcListener, DataChangeScope.BASE );
+
+        final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
+        assertNotNull( writeTx );
+
+        writeTx.put( OPERATIONAL, TestModel.TEST_PATH, testNode );
+
+        AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
+
+        dcListener.waitForChange();
+
+        if( caughtEx.get() != null ) {
+            throw caughtEx.get();
+        }
+
+        NormalizedNode<?, ?> actualNode = dcListener.change.getCreatedData().get( TestModel.TEST_PATH );
+        assertEquals( "Created node", testNode, actualNode );
+    }
+
+    /**
+     * Tests a DataChangeListener that does an async submit of a write Tx in its onDataChanged method.
+     * This should succeed without deadlock.
+     */
+    @Test
+    public void testDataChangeListenerDoingAsyncWriteTxSubmit() throws Throwable {
+
+        final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
+        final CountDownLatch commitCompletedLatch = new CountDownLatch( 1 );
+
+        TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
+            @Override
+            public void onDataChanged( AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
+
+                DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
+                writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
+                             ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
+                Futures.addCallback( writeTx.submit(), new FutureCallback<Void>() {
+                    @Override
+                    public void onSuccess( Void result ) {
+                        commitCompletedLatch.countDown();
+                    }
+
+                    @Override
+                    public void onFailure( Throwable t ) {
+                        caughtCommitEx.set( t );
+                        commitCompletedLatch.countDown();
+                    }
+                } );
+
+                super.onDataChanged( change );
+            }
+        };
+
+        domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
+                                              dcListener, DataChangeScope.BASE );
+
+        final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
+        assertNotNull( writeTx );
+
+        writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
+
+        AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
+
+        dcListener.waitForChange();
+
+        if( caughtEx.get() != null ) {
+            throw caughtEx.get();
+        }
+
+        assertTrue( "Commit Future was not invoked", commitCompletedLatch.await( 5, TimeUnit.SECONDS ) );
+
+        if( caughtCommitEx.get() != null ) {
+            throw caughtCommitEx.get();
+        }
+    }
+
+    /**
+     * Tests a DataChangeListener that does a blocking submit of a write Tx in its onDataChanged method.
+     * This should throw an exception and not deadlock.
+     */
+    @Test(expected=TransactionCommitDeadlockException.class)
+    public void testDataChangeListenerDoingBlockingWriteTxSubmit() throws Throwable {
+
+        final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
+
+        TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
+            @Override
+            public void onDataChanged( AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
+                DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
+                writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
+                             ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
+                try {
+                    writeTx.submit().get();
+                } catch( ExecutionException e ) {
+                    caughtCommitEx.set( e.getCause() );
+                } catch( Exception e ) {
+                    caughtCommitEx.set( e );
+                }
+                finally {
+                    super.onDataChanged( change );
+                }
+            }
+        };
+
+        domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
+                                              dcListener, DataChangeScope.BASE );
+
+        final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
+        assertNotNull( writeTx );
+
+        writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
+
+        AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
 
+        dcListener.waitForChange();
 
+        if( caughtEx.get() != null ) {
+            throw caughtEx.get();
+        }
+
+        if( caughtCommitEx.get() != null ) {
+            throw caughtCommitEx.get();
+        }
+    }
+
+    AtomicReference<Throwable> submitTxAsync( final DOMDataWriteTransaction writeTx ) {
+        final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
+        new Thread() {
+            @Override
+            public void run() {
+
+                try {
+                    writeTx.submit();
+                } catch( Throwable e ) {
+                    caughtEx.set( e );
+                }
+            }
+
+        }.start();
+
+        return caughtEx;
+    }
+
+    static class TestDOMDataChangeListener implements DOMDataChangeListener {
+
+        volatile AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change;
+        private final CountDownLatch latch = new CountDownLatch( 1 );
+
+        @Override
+        public void onDataChanged( AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
+            this.change = change;
+            latch.countDown();
+        }
+
+        void waitForChange() throws InterruptedException {
+            assertTrue( "onDataChanged was not called", latch.await( 5, TimeUnit.SECONDS ) );
+        }
+    }
 }
index d5ba2a2b9a50212e0b75c4036f1afc5fc890b065..09835ec5e37dcd92db413be12cfe5d35967d0910 100644 (file)
@@ -21,6 +21,8 @@ public class TestModel {
 
     public static final QName TEST_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13",
             "test");
+    public static final QName TEST2_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13",
+            "test2");
     public static final QName OUTER_LIST_QNAME = QName.create(TEST_QNAME, "outer-list");
     public static final QName INNER_LIST_QNAME = QName.create(TEST_QNAME, "inner-list");
     public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME, "outer-choice");
@@ -30,6 +32,7 @@ public class TestModel {
     private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang";
 
     public static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.of(TEST_QNAME);
+    public static final YangInstanceIdentifier TEST2_PATH = YangInstanceIdentifier.of(TEST2_QNAME);
     public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).node(OUTER_LIST_QNAME).build();
     public static final QName TWO_QNAME = QName.create(TEST_QNAME,"two");
     public static final QName THREE_QNAME = QName.create(TEST_QNAME,"three");
index 17541fecab02c62f35c7a64cd679ad3ca52849e6..5fbf470f090e44e9e3c080998baa1c1a3a948c46 100644 (file)
@@ -39,4 +39,7 @@ module odl-datastore-test {
             }
         }
     }
+
+    container test2 {
+    }
 }
\ No newline at end of file