package org.opendaylight.mdsal.dom.broker.test;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import org.opendaylight.mdsal.dom.store.inmemory.InMemoryDOMDataStore;
import org.opendaylight.mdsal.dom.spi.store.DOMStore;
-import org.opendaylight.mdsal.common.api.AsyncDataChangeEvent;
+
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.common.api.TransactionCommitDeadlockException;
import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
-import org.opendaylight.mdsal.common.api.AsyncDataBroker.DataChangeScope;
import org.opendaylight.mdsal.dom.broker.AbstractDOMDataBroker;
import org.opendaylight.mdsal.dom.broker.SerializedDOMDataBroker;
-import org.opendaylight.mdsal.dom.api.DOMDataChangeListener;
import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ForwardingExecutorService;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.Collections;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.mockito.Mockito;
import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
writeTx.submit().checkedGet( 5, TimeUnit.SECONDS );
}
- /**
- * Tests a simple DataChangeListener notification after a write.
- */
- @Test
- public void testDataChangeListener() throws Throwable {
-
- final NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode( TestModel.TEST_QNAME );
-
- TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener();
-
- domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
- dcListener, DataChangeScope.BASE );
-
- final DOMDataTreeWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
- assertNotNull( writeTx );
-
- writeTx.put( OPERATIONAL, TestModel.TEST_PATH, testNode );
-
- AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
-
- dcListener.waitForChange();
-
- if( caughtEx.get() != null ) {
- throw caughtEx.get();
- }
-
- NormalizedNode<?, ?> actualNode = dcListener.change.getCreatedData().get( TestModel.TEST_PATH );
- assertEquals( "Created node", testNode, actualNode );
- }
-
- /**
- * Tests a DataChangeListener that does an async submit of a write Tx in its onDataChanged method.
- * This should succeed without deadlock.
- */
- @Test
- public void testDataChangeListenerDoingAsyncWriteTxSubmit() throws Throwable {
-
- final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
- final CountDownLatch commitCompletedLatch = new CountDownLatch( 1 );
-
- TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
- @Override
- public void onDataChanged( final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
-
- DOMDataTreeWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
- writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
- ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
- Futures.addCallback( writeTx.submit(), new FutureCallback<Void>() {
- @Override
- public void onSuccess( final Void result ) {
- commitCompletedLatch.countDown();
- }
-
- @Override
- public void onFailure( final Throwable t ) {
- caughtCommitEx.set( t );
- commitCompletedLatch.countDown();
- }
- } );
-
- super.onDataChanged( change );
- }
- };
-
- domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
- dcListener, DataChangeScope.BASE );
-
- final DOMDataTreeWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
- assertNotNull( writeTx );
-
- writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
-
- AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
-
- dcListener.waitForChange();
-
- if( caughtEx.get() != null ) {
- throw caughtEx.get();
- }
-
- assertTrue( "Commit Future was not invoked", commitCompletedLatch.await( 5, TimeUnit.SECONDS ) );
-
- if( caughtCommitEx.get() != null ) {
- throw caughtCommitEx.get();
- }
- }
-
- /**
- * Tests a DataChangeListener that does a blocking submit of a write Tx in its onDataChanged method.
- * This should throw an exception and not deadlock.
- */
- @Test(expected=TransactionCommitDeadlockException.class)
- public void testDataChangeListenerDoingBlockingWriteTxSubmit() throws Throwable {
-
- final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
-
- TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
- @Override
- public void onDataChanged( final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
- DOMDataTreeWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
- writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
- ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
- try {
- writeTx.submit().get();
- } catch( ExecutionException e ) {
- caughtCommitEx.set( e.getCause() );
- } catch( Exception e ) {
- caughtCommitEx.set( e );
- }
- finally {
- super.onDataChanged( change );
- }
- }
- };
-
- domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
- dcListener, DataChangeScope.BASE );
-
- final DOMDataTreeWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
- assertNotNull( writeTx );
-
- writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
-
- AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
-
- dcListener.waitForChange();
-
- if( caughtEx.get() != null ) {
- throw caughtEx.get();
- }
-
- if( caughtCommitEx.get() != null ) {
- throw caughtCommitEx.get();
- }
- }
-
AtomicReference<Throwable> submitTxAsync( final DOMDataTreeWriteTransaction writeTx ) {
final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
new Thread() {
return caughtEx;
}
- static class TestDOMDataChangeListener implements DOMDataChangeListener {
-
- volatile AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change;
- private final CountDownLatch latch = new CountDownLatch( 1 );
-
- @Override
- public void onDataChanged( final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
- this.change = change;
- latch.countDown();
- }
-
- void waitForChange() throws InterruptedException {
- assertTrue( "onDataChanged was not called", latch.await( 5, TimeUnit.SECONDS ) );
- }
- }
-
static class CommitExecutorService extends ForwardingExecutorService {
ExecutorService delegate;