X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-dom-broker%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fsal%2Fdom%2Fbroker%2Fimpl%2FDOMBrokerTest.java;h=b2fb3c2c3e9421f832576a0f21b90c3859dcbb3a;hp=674d2ff44a24a959dd7a0ca7f3ecc6bcb5662f03;hb=8e4580f8989c7a40861be1c025822ebba4f1cb07;hpb=1d947c2e75c506dd497a15e3eef61bfa65215a35 diff --git a/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMBrokerTest.java b/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMBrokerTest.java index 674d2ff44a..b2fb3c2c3e 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMBrokerTest.java +++ b/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMBrokerTest.java @@ -1,37 +1,40 @@ +/* + * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + package org.opendaylight.controller.md.sal.dom.broker.impl; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION; import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL; + import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ForwardingExecutorService; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import java.util.Collections; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; -import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; @@ -40,7 +43,6 @@ import org.opendaylight.controller.md.sal.dom.store.impl.TestModel; import org.opendaylight.controller.sal.core.spi.data.DOMStore; import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService; import org.opendaylight.yangtools.util.concurrent.SpecialExecutors; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -48,7 +50,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; public class DOMBrokerTest { private SchemaContext schemaContext; - private DOMDataBrokerImpl domBroker; + private AbstractDOMDataBroker domBroker; private ListeningExecutorService executor; private ExecutorService futureExecutor; private CommitExecutorService commitExecutor; @@ -56,39 +58,40 @@ public class DOMBrokerTest { @Before public void setupStore() { - InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", - MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor()); - InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", - MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor()); + InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.newDirectExecutorService()); + InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.newDirectExecutorService()); schemaContext = TestModel.createTestContext(); operStore.onGlobalContextUpdated(schemaContext); configStore.onGlobalContextUpdated(schemaContext); - ImmutableMap stores = ImmutableMap. builder() // + final ImmutableMap stores = + ImmutableMap.builder() // .put(CONFIGURATION, configStore) // .put(OPERATIONAL, operStore) // .build(); commitExecutor = new CommitExecutorService(Executors.newSingleThreadExecutor()); - futureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(1, 5, "FCB"); + futureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(1, 5, "FCB", DOMBrokerTest.class); executor = new DeadlockDetectingListeningExecutorService(commitExecutor, - TransactionCommitDeadlockException.DEADLOCK_EXCEPTION_SUPPLIER, futureExecutor); - domBroker = new DOMDataBrokerImpl(stores, executor); + TransactionCommitDeadlockException + .DEADLOCK_EXCEPTION_SUPPLIER, + futureExecutor); + domBroker = new SerializedDOMDataBroker(stores, executor); } @After public void tearDown() { - if( executor != null ) { + if (executor != null) { executor.shutdownNow(); } - if(futureExecutor != null) { + if (futureExecutor != null) { futureExecutor.shutdownNow(); } } - @Test(timeout=10000) + @Test(timeout = 10000) public void testTransactionIsolation() throws InterruptedException, ExecutionException { assertNotNull(domBroker); @@ -106,16 +109,14 @@ public class DOMBrokerTest { writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); /** - * * Reads /test from writeTx Read should return container. * */ - ListenableFuture>> writeTxContainer = writeTx.read(OPERATIONAL, - TestModel.TEST_PATH); + ListenableFuture>> writeTxContainer = writeTx + .read(OPERATIONAL, TestModel.TEST_PATH); assertTrue(writeTxContainer.get().isPresent()); /** - * * Reads /test from readTx Read should return Absent. * */ @@ -124,8 +125,8 @@ public class DOMBrokerTest { assertFalse(readTxContainer.get().isPresent()); } - @Test(timeout=10000) - public void testTransactionCommit() throws InterruptedException, ExecutionException { + @Test(timeout = 10000) + public void testTransactionCommit() throws InterruptedException, ExecutionException, TimeoutException { DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction(); assertNotNull(writeTx); @@ -137,214 +138,91 @@ public class DOMBrokerTest { writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); /** - * * Reads /test from writeTx Read should return container. * */ - ListenableFuture>> writeTxContainer = writeTx.read(OPERATIONAL, - TestModel.TEST_PATH); + ListenableFuture>> writeTxContainer = writeTx + .read(OPERATIONAL, TestModel.TEST_PATH); assertTrue(writeTxContainer.get().isPresent()); - writeTx.submit().get(); + writeTx.commit().get(5, TimeUnit.SECONDS); Optional> afterCommitRead = domBroker.newReadOnlyTransaction() .read(OPERATIONAL, TestModel.TEST_PATH).get(); assertTrue(afterCommitRead.isPresent()); } - @Test(expected=TransactionCommitFailedException.class) - public void testRejectedCommit() throws Exception { - - commitExecutor.delegate = Mockito.mock( ExecutorService.class ); - Mockito.doThrow( new RejectedExecutionException( "mock" ) ) - .when( commitExecutor.delegate ).execute( Mockito.any( Runnable.class ) ); - Mockito.doNothing().when( commitExecutor.delegate ).shutdown(); - Mockito.doReturn( Collections.emptyList() ).when( commitExecutor.delegate ).shutdownNow(); - Mockito.doReturn( "" ).when( commitExecutor.delegate ).toString(); - Mockito.doReturn( true ).when( commitExecutor.delegate ) - .awaitTermination( Mockito.anyLong(), Mockito.any( TimeUnit.class ) ); + @Test(timeout = 10000) + @Deprecated + public void testTransactionSubmit() throws InterruptedException, ExecutionException, TimeoutException { DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction(); - writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME) ); - - writeTx.submit().checkedGet( 5, TimeUnit.SECONDS ); - } - - /** - * Tests a simple DataChangeListener notification after a write. - */ - @Test - public void testDataChangeListener() throws Throwable { - - final NormalizedNode testNode = ImmutableNodes.containerNode( TestModel.TEST_QNAME ); - - TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener(); - - domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH, - dcListener, DataChangeScope.BASE ); - - final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction(); - assertNotNull( writeTx ); - - writeTx.put( OPERATIONAL, TestModel.TEST_PATH, testNode ); - - AtomicReference caughtEx = submitTxAsync( writeTx ); - - dcListener.waitForChange(); - - if( caughtEx.get() != null ) { - throw caughtEx.get(); - } - - NormalizedNode actualNode = dcListener.change.getCreatedData().get( TestModel.TEST_PATH ); - assertEquals( "Created node", testNode, actualNode ); - } - - /** - * Tests a DataChangeListener that does an async submit of a write Tx in its onDataChanged method. - * This should succeed without deadlock. - */ - @Test - public void testDataChangeListenerDoingAsyncWriteTxSubmit() throws Throwable { - - final AtomicReference caughtCommitEx = new AtomicReference<>(); - final CountDownLatch commitCompletedLatch = new CountDownLatch( 1 ); - - TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() { - @Override - public void onDataChanged( final AsyncDataChangeEvent> change ) { - - DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction(); - writeTx.put( OPERATIONAL, TestModel.TEST2_PATH, - ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) ); - Futures.addCallback( writeTx.submit(), new FutureCallback() { - @Override - public void onSuccess( final Void result ) { - commitCompletedLatch.countDown(); - } - - @Override - public void onFailure( final Throwable t ) { - caughtCommitEx.set( t ); - commitCompletedLatch.countDown(); - } - } ); - - super.onDataChanged( change ); - } - }; - - domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH, - dcListener, DataChangeScope.BASE ); - - final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction(); - assertNotNull( writeTx ); - - writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) ); - - AtomicReference caughtEx = submitTxAsync( writeTx ); - - dcListener.waitForChange(); + assertNotNull(writeTx); + /** + * + * Writes /test in writeTx + * + */ + writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - if( caughtEx.get() != null ) { - throw caughtEx.get(); - } + /** + * Reads /test from writeTx Read should return container. + * + */ + ListenableFuture>> writeTxContainer = writeTx + .read(OPERATIONAL, TestModel.TEST_PATH); + assertTrue(writeTxContainer.get().isPresent()); - assertTrue( "Commit Future was not invoked", commitCompletedLatch.await( 5, TimeUnit.SECONDS ) ); + writeTx.submit().get(5, TimeUnit.SECONDS); - if( caughtCommitEx.get() != null ) { - throw caughtCommitEx.get(); - } + Optional> afterCommitRead = domBroker.newReadOnlyTransaction() + .read(OPERATIONAL, TestModel.TEST_PATH).get(); + assertTrue(afterCommitRead.isPresent()); } - /** - * Tests a DataChangeListener that does a blocking submit of a write Tx in its onDataChanged method. - * This should throw an exception and not deadlock. - */ - @Test(expected=TransactionCommitDeadlockException.class) - public void testDataChangeListenerDoingBlockingWriteTxSubmit() throws Throwable { - - final AtomicReference caughtCommitEx = new AtomicReference<>(); - - TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() { - @Override - public void onDataChanged( final AsyncDataChangeEvent> change ) { - DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction(); - writeTx.put( OPERATIONAL, TestModel.TEST2_PATH, - ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) ); - try { - writeTx.submit().get(); - } catch( ExecutionException e ) { - caughtCommitEx.set( e.getCause() ); - } catch( Exception e ) { - caughtCommitEx.set( e ); - } - finally { - super.onDataChanged( change ); - } - } - }; + @Test(expected = TransactionCommitFailedException.class) + @SuppressWarnings({"checkstyle:IllegalThrows", "checkstyle:AvoidHidingCauseException"}) + public void testRejectedCommit() throws Throwable { - domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH, - dcListener, DataChangeScope.BASE ); + commitExecutor.delegate = Mockito.mock(ExecutorService.class); + Mockito.doThrow(new RejectedExecutionException("mock")).when(commitExecutor.delegate) + .execute(Mockito.any(Runnable.class)); + Mockito.doNothing().when(commitExecutor.delegate).shutdown(); + Mockito.doReturn(Collections.emptyList()).when(commitExecutor.delegate).shutdownNow(); + Mockito.doReturn("").when(commitExecutor.delegate).toString(); + Mockito.doReturn(true).when(commitExecutor.delegate) + .awaitTermination(Mockito.anyLong(), Mockito.any(TimeUnit.class)); - final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction(); - assertNotNull( writeTx ); - - writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) ); - - AtomicReference caughtEx = submitTxAsync( writeTx ); - - dcListener.waitForChange(); - - if( caughtEx.get() != null ) { - throw caughtEx.get(); - } + DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction(); + writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - if( caughtCommitEx.get() != null ) { - throw caughtCommitEx.get(); + try { + writeTx.commit().get(5, TimeUnit.SECONDS); + } catch (ExecutionException e) { + throw e.getCause(); } } - AtomicReference submitTxAsync( final DOMDataWriteTransaction writeTx ) { + @SuppressWarnings("checkstyle:IllegalCatch") + AtomicReference submitTxAsync(final DOMDataWriteTransaction writeTx) { final AtomicReference caughtEx = new AtomicReference<>(); - new Thread() { - @Override - public void run() { - - try { - writeTx.submit(); - } catch( Throwable e ) { - caughtEx.set( e ); - } + new Thread(() -> { + try { + writeTx.commit(); + } catch (Throwable e) { + caughtEx.set(e); } - - }.start(); + }).start(); return caughtEx; } - static class TestDOMDataChangeListener implements DOMDataChangeListener { - - volatile AsyncDataChangeEvent> change; - private final CountDownLatch latch = new CountDownLatch( 1 ); - - @Override - public void onDataChanged( final AsyncDataChangeEvent> change ) { - this.change = change; - latch.countDown(); - } - - void waitForChange() throws InterruptedException { - assertTrue( "onDataChanged was not called", latch.await( 5, TimeUnit.SECONDS ) ); - } - } static class CommitExecutorService extends ForwardingExecutorService { ExecutorService delegate; - public CommitExecutorService( final ExecutorService delegate ) { + CommitExecutorService(final ExecutorService delegate) { this.delegate = delegate; }