X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-dom-broker%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fsal%2Fdom%2Fbroker%2Fimpl%2FDOMBrokerTest.java;h=e2baf3ecbf2ae993c907fad8a3f702d263e4fc07;hp=b006ca94e5d1387bfd84e7a76eccba3700756905;hb=2d60632f7cf63712e8357a3cf3fc40d83366e5e6;hpb=c46e223995956f1f759c551163c212947c1e2fb7 diff --git a/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMBrokerTest.java b/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMBrokerTest.java index b006ca94e5..e2baf3ecbf 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMBrokerTest.java +++ b/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMBrokerTest.java @@ -1,56 +1,104 @@ +/* + * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + package org.opendaylight.controller.md.sal.dom.broker.impl; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION; import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ForwardingExecutorService; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; - +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.md.sal.dom.store.impl.TestModel; import org.opendaylight.controller.sal.core.spi.data.DOMStore; +import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService; +import org.opendaylight.yangtools.util.concurrent.SpecialExecutors; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import com.google.common.base.Optional; -import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; - public class DOMBrokerTest { private SchemaContext schemaContext; - private DOMDataBrokerImpl domBroker; + private AbstractDOMDataBroker domBroker; + private ListeningExecutorService executor; + private ExecutorService futureExecutor; + private CommitExecutorService commitExecutor; @Before public void setupStore() { - InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor()); - InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor()); + + InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.newDirectExecutorService()); + InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.newDirectExecutorService()); schemaContext = TestModel.createTestContext(); operStore.onGlobalContextUpdated(schemaContext); configStore.onGlobalContextUpdated(schemaContext); - ImmutableMap stores = ImmutableMap. builder() // + final ImmutableMap stores = + ImmutableMap.builder() // .put(CONFIGURATION, configStore) // .put(OPERATIONAL, operStore) // .build(); - ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); - domBroker = new DOMDataBrokerImpl(stores, executor); + commitExecutor = new CommitExecutorService(Executors.newSingleThreadExecutor()); + futureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(1, 5, "FCB"); + executor = new DeadlockDetectingListeningExecutorService(commitExecutor, + TransactionCommitDeadlockException + .DEADLOCK_EXCEPTION_SUPPLIER, + futureExecutor); + domBroker = new SerializedDOMDataBroker(stores, executor); } - @Test + @After + public void tearDown() { + if (executor != null) { + executor.shutdownNow(); + } + + if (futureExecutor != null) { + futureExecutor.shutdownNow(); + } + } + + @Test(timeout = 10000) public void testTransactionIsolation() throws InterruptedException, ExecutionException { assertNotNull(domBroker); @@ -68,16 +116,14 @@ public class DOMBrokerTest { writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); /** - * * Reads /test from writeTx Read should return container. * */ - ListenableFuture>> writeTxContainer = writeTx.read(OPERATIONAL, - TestModel.TEST_PATH); + ListenableFuture>> writeTxContainer = writeTx + .read(OPERATIONAL, TestModel.TEST_PATH); assertTrue(writeTxContainer.get().isPresent()); /** - * * Reads /test from readTx Read should return Absent. * */ @@ -86,7 +132,7 @@ public class DOMBrokerTest { assertFalse(readTxContainer.get().isPresent()); } - @Test + @Test(timeout = 10000) public void testTransactionCommit() throws InterruptedException, ExecutionException { DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction(); @@ -99,12 +145,11 @@ public class DOMBrokerTest { writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); /** - * * Reads /test from writeTx Read should return container. * */ - ListenableFuture>> writeTxContainer = writeTx.read(OPERATIONAL, - TestModel.TEST_PATH); + ListenableFuture>> writeTxContainer = writeTx + .read(OPERATIONAL, TestModel.TEST_PATH); assertTrue(writeTxContainer.get().isPresent()); writeTx.submit().get(); @@ -114,6 +159,198 @@ public class DOMBrokerTest { assertTrue(afterCommitRead.isPresent()); } + @Test(expected = TransactionCommitFailedException.class) + public void testRejectedCommit() throws Exception { + + commitExecutor.delegate = Mockito.mock(ExecutorService.class); + Mockito.doThrow(new RejectedExecutionException("mock")).when(commitExecutor.delegate) + .execute(Mockito.any(Runnable.class)); + Mockito.doNothing().when(commitExecutor.delegate).shutdown(); + Mockito.doReturn(Collections.emptyList()).when(commitExecutor.delegate).shutdownNow(); + Mockito.doReturn("").when(commitExecutor.delegate).toString(); + Mockito.doReturn(true).when(commitExecutor.delegate) + .awaitTermination(Mockito.anyLong(), Mockito.any(TimeUnit.class)); + + DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction(); + writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + writeTx.submit().checkedGet(5, TimeUnit.SECONDS); + } + + /** + * Tests a simple DataChangeListener notification after a write. + */ + @Test + @SuppressWarnings("checkstyle:IllegalThrows") + public void testDataChangeListener() throws Throwable { + + final NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener(); + domBroker.registerDataChangeListener(OPERATIONAL, TestModel.TEST_PATH, dcListener, DataChangeScope.BASE); + + final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction(); + assertNotNull(writeTx); + + writeTx.put(OPERATIONAL, TestModel.TEST_PATH, testNode); + + AtomicReference caughtEx = submitTxAsync(writeTx); + + dcListener.waitForChange(); + + if (caughtEx.get() != null) { + throw caughtEx.get(); + } + + NormalizedNode actualNode = dcListener.capturedChange.getCreatedData().get(TestModel.TEST_PATH); + assertEquals("Created node", testNode, actualNode); + } + + /** + * Tests a DataChangeListener that does an async submit of a write Tx in its onDataChanged method. + * This should succeed without deadlock. + */ + @Test + @SuppressWarnings("checkstyle:IllegalThrows") + public void testDataChangeListenerDoingAsyncWriteTxSubmit() throws Throwable { + + final AtomicReference caughtCommitEx = new AtomicReference<>(); + final CountDownLatch commitCompletedLatch = new CountDownLatch(1); + + TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() { + @Override + public void onDataChanged(final AsyncDataChangeEvent> change) { + + DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction(); + writeTx.put(OPERATIONAL, TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME)); + Futures.addCallback(writeTx.submit(), new FutureCallback() { + @Override + public void onSuccess(final Void result) { + commitCompletedLatch.countDown(); + } + + @Override + public void onFailure(final Throwable throwable) { + caughtCommitEx.set(throwable); + commitCompletedLatch.countDown(); + } + }); + + super.onDataChanged(change); + } + }; + + domBroker.registerDataChangeListener(OPERATIONAL, TestModel.TEST_PATH, dcListener, DataChangeScope.BASE); + + final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction(); + assertNotNull(writeTx); + + writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + AtomicReference caughtEx = submitTxAsync(writeTx); + + dcListener.waitForChange(); + + if (caughtEx.get() != null) { + throw caughtEx.get(); + } + + assertTrue("Commit Future was not invoked", commitCompletedLatch.await(5, TimeUnit.SECONDS)); + + if (caughtCommitEx.get() != null) { + throw caughtCommitEx.get(); + } + } + + /** + * Tests a DataChangeListener that does a blocking submit of a write Tx in its onDataChanged method. + * This should throw an exception and not deadlock. + */ + @Test(expected = TransactionCommitDeadlockException.class) + @SuppressWarnings({"checkstyle:IllegalThrows", "checkstyle:IllegalCatch"}) + public void testDataChangeListenerDoingBlockingWriteTxSubmit() throws Throwable { + + final AtomicReference caughtCommitEx = new AtomicReference<>(); + + TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() { + @Override + public void onDataChanged(final AsyncDataChangeEvent> change) { + DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction(); + writeTx.put(OPERATIONAL, TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME)); + try { + writeTx.submit().get(); + } catch (ExecutionException e) { + caughtCommitEx.set(e.getCause()); + } catch (Exception e) { + caughtCommitEx.set(e); + } finally { + super.onDataChanged(change); + } + } + }; + + domBroker.registerDataChangeListener(OPERATIONAL, TestModel.TEST_PATH, dcListener, DataChangeScope.BASE); + + final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction(); + assertNotNull(writeTx); + + writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + AtomicReference caughtEx = submitTxAsync(writeTx); + + dcListener.waitForChange(); + + if (caughtEx.get() != null) { + throw caughtEx.get(); + } + + if (caughtCommitEx.get() != null) { + throw caughtCommitEx.get(); + } + } + + @SuppressWarnings("checkstyle:IllegalCatch") + AtomicReference submitTxAsync(final DOMDataWriteTransaction writeTx) { + final AtomicReference caughtEx = new AtomicReference<>(); + new Thread(() -> { + try { + writeTx.submit(); + } catch (Throwable e) { + caughtEx.set(e); + } + }).start(); + + return caughtEx; + } + + static class TestDOMDataChangeListener implements DOMDataChangeListener { + + volatile AsyncDataChangeEvent> capturedChange; + private final CountDownLatch latch = new CountDownLatch(1); + + @Override + public void onDataChanged(final AsyncDataChangeEvent> change) { + this.capturedChange = change; + latch.countDown(); + } + + void waitForChange() throws InterruptedException { + assertTrue("onDataChanged was not called", latch.await(5, TimeUnit.SECONDS)); + } + } + + static class CommitExecutorService extends ForwardingExecutorService { + + ExecutorService delegate; + + CommitExecutorService(final ExecutorService delegate) { + this.delegate = delegate; + } + + @Override + protected ExecutorService delegate() { + return delegate; + } + } }