1 package org.opendaylight.controller.md.sal.dom.broker.impl;
3 import static org.junit.Assert.assertFalse;
4 import static org.junit.Assert.assertNotNull;
5 import static org.junit.Assert.assertTrue;
6 import static org.junit.Assert.assertEquals;
7 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
8 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
10 import java.util.concurrent.CountDownLatch;
11 import java.util.concurrent.ExecutionException;
12 import java.util.concurrent.Executors;
13 import java.util.concurrent.TimeUnit;
14 import java.util.concurrent.atomic.AtomicReference;
16 import org.junit.After;
17 import org.junit.Before;
18 import org.junit.Test;
19 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
20 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
21 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
22 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException;
23 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
24 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
25 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
26 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
27 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
28 import org.opendaylight.controller.md.sal.dom.store.impl.TestModel;
29 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
30 import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
31 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
32 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
33 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
34 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
36 import com.google.common.base.Optional;
37 import com.google.common.collect.ImmutableMap;
38 import com.google.common.util.concurrent.FutureCallback;
39 import com.google.common.util.concurrent.Futures;
40 import com.google.common.util.concurrent.ListenableFuture;
41 import com.google.common.util.concurrent.ListeningExecutorService;
42 import com.google.common.util.concurrent.MoreExecutors;
44 public class DOMBrokerTest {
46 private SchemaContext schemaContext;
47 private DOMDataBrokerImpl domBroker;
48 private ListeningExecutorService executor;
51 public void setupStore() {
52 InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
53 InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
54 schemaContext = TestModel.createTestContext();
56 operStore.onGlobalContextUpdated(schemaContext);
57 configStore.onGlobalContextUpdated(schemaContext);
59 ImmutableMap<LogicalDatastoreType, DOMStore> stores = ImmutableMap.<LogicalDatastoreType, DOMStore> builder() //
60 .put(CONFIGURATION, configStore) //
61 .put(OPERATIONAL, operStore) //
64 executor = new DeadlockDetectingListeningExecutorService(Executors.newSingleThreadExecutor(),
65 TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION);
66 domBroker = new DOMDataBrokerImpl(stores, executor);
70 public void tearDown() {
71 if( executor != null ) {
72 executor.shutdownNow();
77 public void testTransactionIsolation() throws InterruptedException, ExecutionException {
79 assertNotNull(domBroker);
81 DOMDataReadTransaction readTx = domBroker.newReadOnlyTransaction();
82 assertNotNull(readTx);
84 DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
85 assertNotNull(writeTx);
88 * Writes /test in writeTx
91 writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
95 * Reads /test from writeTx Read should return container.
98 ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(OPERATIONAL,
100 assertTrue(writeTxContainer.get().isPresent());
104 * Reads /test from readTx Read should return Absent.
107 ListenableFuture<Optional<NormalizedNode<?, ?>>> readTxContainer = readTx
108 .read(OPERATIONAL, TestModel.TEST_PATH);
109 assertFalse(readTxContainer.get().isPresent());
113 public void testTransactionCommit() throws InterruptedException, ExecutionException {
115 DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
116 assertNotNull(writeTx);
119 * Writes /test in writeTx
122 writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
126 * Reads /test from writeTx Read should return container.
129 ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(OPERATIONAL,
130 TestModel.TEST_PATH);
131 assertTrue(writeTxContainer.get().isPresent());
133 writeTx.submit().get();
135 Optional<NormalizedNode<?, ?>> afterCommitRead = domBroker.newReadOnlyTransaction()
136 .read(OPERATIONAL, TestModel.TEST_PATH).get();
137 assertTrue(afterCommitRead.isPresent());
141 * Tests a simple DataChangeListener notification after a write.
144 public void testDataChangeListener() throws Throwable {
146 final NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode( TestModel.TEST_QNAME );
148 TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener();
150 domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
151 dcListener, DataChangeScope.BASE );
153 final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
154 assertNotNull( writeTx );
156 writeTx.put( OPERATIONAL, TestModel.TEST_PATH, testNode );
158 AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
160 dcListener.waitForChange();
162 if( caughtEx.get() != null ) {
163 throw caughtEx.get();
166 NormalizedNode<?, ?> actualNode = dcListener.change.getCreatedData().get( TestModel.TEST_PATH );
167 assertEquals( "Created node", testNode, actualNode );
171 * Tests a DataChangeListener that does an async submit of a write Tx in its onDataChanged method.
172 * This should succeed without deadlock.
175 public void testDataChangeListenerDoingAsyncWriteTxSubmit() throws Throwable {
177 final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
178 final CountDownLatch commitCompletedLatch = new CountDownLatch( 1 );
180 TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
182 public void onDataChanged( AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
184 DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
185 writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
186 ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
187 Futures.addCallback( writeTx.submit(), new FutureCallback<Void>() {
189 public void onSuccess( Void result ) {
190 commitCompletedLatch.countDown();
194 public void onFailure( Throwable t ) {
195 caughtCommitEx.set( t );
196 commitCompletedLatch.countDown();
200 super.onDataChanged( change );
204 domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
205 dcListener, DataChangeScope.BASE );
207 final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
208 assertNotNull( writeTx );
210 writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
212 AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
214 dcListener.waitForChange();
216 if( caughtEx.get() != null ) {
217 throw caughtEx.get();
220 assertTrue( "Commit Future was not invoked", commitCompletedLatch.await( 5, TimeUnit.SECONDS ) );
222 if( caughtCommitEx.get() != null ) {
223 throw caughtCommitEx.get();
228 * Tests a DataChangeListener that does a blocking submit of a write Tx in its onDataChanged method.
229 * This should throw an exception and not deadlock.
231 @Test(expected=TransactionCommitDeadlockException.class)
232 public void testDataChangeListenerDoingBlockingWriteTxSubmit() throws Throwable {
234 final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
236 TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
238 public void onDataChanged( AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
239 DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
240 writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
241 ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
243 writeTx.submit().get();
244 } catch( ExecutionException e ) {
245 caughtCommitEx.set( e.getCause() );
246 } catch( Exception e ) {
247 caughtCommitEx.set( e );
250 super.onDataChanged( change );
255 domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
256 dcListener, DataChangeScope.BASE );
258 final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
259 assertNotNull( writeTx );
261 writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
263 AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
265 dcListener.waitForChange();
267 if( caughtEx.get() != null ) {
268 throw caughtEx.get();
271 if( caughtCommitEx.get() != null ) {
272 throw caughtCommitEx.get();
276 AtomicReference<Throwable> submitTxAsync( final DOMDataWriteTransaction writeTx ) {
277 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
284 } catch( Throwable e ) {
294 static class TestDOMDataChangeListener implements DOMDataChangeListener {
296 volatile AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change;
297 private final CountDownLatch latch = new CountDownLatch( 1 );
300 public void onDataChanged( AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
301 this.change = change;
305 void waitForChange() throws InterruptedException {
306 assertTrue( "onDataChanged was not called", latch.await( 5, TimeUnit.SECONDS ) );