2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.md.sal.dom.broker.impl;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
16 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
17 import com.google.common.base.Optional;
18 import com.google.common.collect.ImmutableMap;
19 import com.google.common.util.concurrent.ForwardingExecutorService;
20 import com.google.common.util.concurrent.FutureCallback;
21 import com.google.common.util.concurrent.Futures;
22 import com.google.common.util.concurrent.ListenableFuture;
23 import com.google.common.util.concurrent.ListeningExecutorService;
24 import com.google.common.util.concurrent.MoreExecutors;
25 import java.util.Collections;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.RejectedExecutionException;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicReference;
33 import org.junit.After;
34 import org.junit.Before;
35 import org.junit.Test;
36 import org.mockito.Mockito;
37 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
38 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
39 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
40 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException;
41 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
42 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
43 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
44 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
45 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
46 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
47 import org.opendaylight.controller.md.sal.dom.store.impl.TestModel;
48 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
49 import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
50 import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
51 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
52 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
53 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
54 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
56 public class DOMBrokerTest {
58 private SchemaContext schemaContext;
59 private AbstractDOMDataBroker domBroker;
60 private ListeningExecutorService executor;
61 private ExecutorService futureExecutor;
62 private CommitExecutorService commitExecutor;
65 public void setupStore() {
67 InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER",
68 MoreExecutors.newDirectExecutorService());
69 InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG",
70 MoreExecutors.newDirectExecutorService());
71 schemaContext = TestModel.createTestContext();
73 operStore.onGlobalContextUpdated(schemaContext);
74 configStore.onGlobalContextUpdated(schemaContext);
76 ImmutableMap<LogicalDatastoreType, DOMStore> stores = ImmutableMap.<LogicalDatastoreType, DOMStore> builder() //
77 .put(CONFIGURATION, configStore) //
78 .put(OPERATIONAL, operStore) //
81 commitExecutor = new CommitExecutorService(Executors.newSingleThreadExecutor());
82 futureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(1, 5, "FCB");
83 executor = new DeadlockDetectingListeningExecutorService(commitExecutor,
84 TransactionCommitDeadlockException.DEADLOCK_EXCEPTION_SUPPLIER, futureExecutor);
85 domBroker = new SerializedDOMDataBroker(stores, executor);
89 public void tearDown() {
90 if( executor != null ) {
91 executor.shutdownNow();
94 if(futureExecutor != null) {
95 futureExecutor.shutdownNow();
100 public void testTransactionIsolation() throws InterruptedException, ExecutionException {
102 assertNotNull(domBroker);
104 DOMDataReadTransaction readTx = domBroker.newReadOnlyTransaction();
105 assertNotNull(readTx);
107 DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
108 assertNotNull(writeTx);
111 * Writes /test in writeTx
114 writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
118 * Reads /test from writeTx Read should return container.
121 ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(OPERATIONAL,
122 TestModel.TEST_PATH);
123 assertTrue(writeTxContainer.get().isPresent());
127 * Reads /test from readTx Read should return Absent.
130 ListenableFuture<Optional<NormalizedNode<?, ?>>> readTxContainer = readTx
131 .read(OPERATIONAL, TestModel.TEST_PATH);
132 assertFalse(readTxContainer.get().isPresent());
136 public void testTransactionCommit() throws InterruptedException, ExecutionException {
138 DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
139 assertNotNull(writeTx);
142 * Writes /test in writeTx
145 writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
149 * Reads /test from writeTx Read should return container.
152 ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(OPERATIONAL,
153 TestModel.TEST_PATH);
154 assertTrue(writeTxContainer.get().isPresent());
156 writeTx.submit().get();
158 Optional<NormalizedNode<?, ?>> afterCommitRead = domBroker.newReadOnlyTransaction()
159 .read(OPERATIONAL, TestModel.TEST_PATH).get();
160 assertTrue(afterCommitRead.isPresent());
163 @Test(expected=TransactionCommitFailedException.class)
164 public void testRejectedCommit() throws Exception {
166 commitExecutor.delegate = Mockito.mock( ExecutorService.class );
167 Mockito.doThrow( new RejectedExecutionException( "mock" ) )
168 .when( commitExecutor.delegate ).execute( Mockito.any( Runnable.class ) );
169 Mockito.doNothing().when( commitExecutor.delegate ).shutdown();
170 Mockito.doReturn( Collections.emptyList() ).when( commitExecutor.delegate ).shutdownNow();
171 Mockito.doReturn( "" ).when( commitExecutor.delegate ).toString();
172 Mockito.doReturn( true ).when( commitExecutor.delegate )
173 .awaitTermination( Mockito.anyLong(), Mockito.any( TimeUnit.class ) );
175 DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
176 writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME) );
178 writeTx.submit().checkedGet( 5, TimeUnit.SECONDS );
182 * Tests a simple DataChangeListener notification after a write.
185 public void testDataChangeListener() throws Throwable {
187 final NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode( TestModel.TEST_QNAME );
189 TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener();
191 domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
192 dcListener, DataChangeScope.BASE );
194 final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
195 assertNotNull( writeTx );
197 writeTx.put( OPERATIONAL, TestModel.TEST_PATH, testNode );
199 AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
201 dcListener.waitForChange();
203 if( caughtEx.get() != null ) {
204 throw caughtEx.get();
207 NormalizedNode<?, ?> actualNode = dcListener.change.getCreatedData().get( TestModel.TEST_PATH );
208 assertEquals( "Created node", testNode, actualNode );
212 * Tests a DataChangeListener that does an async submit of a write Tx in its onDataChanged method.
213 * This should succeed without deadlock.
216 public void testDataChangeListenerDoingAsyncWriteTxSubmit() throws Throwable {
218 final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
219 final CountDownLatch commitCompletedLatch = new CountDownLatch( 1 );
221 TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
223 public void onDataChanged( final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
225 DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
226 writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
227 ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
228 Futures.addCallback( writeTx.submit(), new FutureCallback<Void>() {
230 public void onSuccess( final Void result ) {
231 commitCompletedLatch.countDown();
235 public void onFailure( final Throwable t ) {
236 caughtCommitEx.set( t );
237 commitCompletedLatch.countDown();
241 super.onDataChanged( change );
245 domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
246 dcListener, DataChangeScope.BASE );
248 final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
249 assertNotNull( writeTx );
251 writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
253 AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
255 dcListener.waitForChange();
257 if( caughtEx.get() != null ) {
258 throw caughtEx.get();
261 assertTrue( "Commit Future was not invoked", commitCompletedLatch.await( 5, TimeUnit.SECONDS ) );
263 if( caughtCommitEx.get() != null ) {
264 throw caughtCommitEx.get();
269 * Tests a DataChangeListener that does a blocking submit of a write Tx in its onDataChanged method.
270 * This should throw an exception and not deadlock.
272 @Test(expected=TransactionCommitDeadlockException.class)
273 public void testDataChangeListenerDoingBlockingWriteTxSubmit() throws Throwable {
275 final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
277 TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
279 public void onDataChanged( final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
280 DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
281 writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
282 ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
284 writeTx.submit().get();
285 } catch( ExecutionException e ) {
286 caughtCommitEx.set( e.getCause() );
287 } catch( Exception e ) {
288 caughtCommitEx.set( e );
291 super.onDataChanged( change );
296 domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
297 dcListener, DataChangeScope.BASE );
299 final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
300 assertNotNull( writeTx );
302 writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
304 AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
306 dcListener.waitForChange();
308 if( caughtEx.get() != null ) {
309 throw caughtEx.get();
312 if( caughtCommitEx.get() != null ) {
313 throw caughtCommitEx.get();
317 AtomicReference<Throwable> submitTxAsync( final DOMDataWriteTransaction writeTx ) {
318 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
322 } catch (Throwable e) {
330 static class TestDOMDataChangeListener implements DOMDataChangeListener {
332 volatile AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change;
333 private final CountDownLatch latch = new CountDownLatch( 1 );
336 public void onDataChanged( final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
337 this.change = change;
341 void waitForChange() throws InterruptedException {
342 assertTrue( "onDataChanged was not called", latch.await( 5, TimeUnit.SECONDS ) );
346 static class CommitExecutorService extends ForwardingExecutorService {
348 ExecutorService delegate;
350 public CommitExecutorService( final ExecutorService delegate ) {
351 this.delegate = delegate;
355 protected ExecutorService delegate() {