2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.md.sal.dom.broker.impl;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
16 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
18 import org.opendaylight.mdsal.dom.api.DOMDataChangeListener;
19 import org.opendaylight.mdsal.dom.api.DOMDataReadTransaction;
20 import org.opendaylight.mdsal.dom.api.DOMDataReadWriteTransaction;
21 import org.opendaylight.mdsal.dom.api.DOMDataWriteTransaction;
23 import com.google.common.base.Optional;
24 import com.google.common.collect.ImmutableMap;
25 import com.google.common.util.concurrent.ForwardingExecutorService;
26 import com.google.common.util.concurrent.FutureCallback;
27 import com.google.common.util.concurrent.Futures;
28 import com.google.common.util.concurrent.ListenableFuture;
29 import com.google.common.util.concurrent.ListeningExecutorService;
30 import com.google.common.util.concurrent.MoreExecutors;
31 import java.util.Collections;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.ExecutionException;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.RejectedExecutionException;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicReference;
39 import org.junit.After;
40 import org.junit.Before;
41 import org.junit.Test;
42 import org.mockito.Mockito;
43 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
44 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
45 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
46 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException;
47 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
48 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
49 import org.opendaylight.controller.md.sal.dom.store.impl.TestModel;
50 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
51 import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
52 import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
53 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
54 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
55 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
56 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
58 public class DOMBrokerTest {
60 private SchemaContext schemaContext;
61 private AbstractDOMDataBroker domBroker;
62 private ListeningExecutorService executor;
63 private ExecutorService futureExecutor;
64 private CommitExecutorService commitExecutor;
67 public void setupStore() {
69 InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER",
70 MoreExecutors.newDirectExecutorService());
71 InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG",
72 MoreExecutors.newDirectExecutorService());
73 schemaContext = TestModel.createTestContext();
75 operStore.onGlobalContextUpdated(schemaContext);
76 configStore.onGlobalContextUpdated(schemaContext);
78 ImmutableMap<LogicalDatastoreType, DOMStore> stores = ImmutableMap.<LogicalDatastoreType, DOMStore> builder() //
79 .put(CONFIGURATION, configStore) //
80 .put(OPERATIONAL, operStore) //
83 commitExecutor = new CommitExecutorService(Executors.newSingleThreadExecutor());
84 futureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(1, 5, "FCB");
85 executor = new DeadlockDetectingListeningExecutorService(commitExecutor,
86 TransactionCommitDeadlockException.DEADLOCK_EXCEPTION_SUPPLIER, futureExecutor);
87 domBroker = new SerializedDOMDataBroker(stores, executor);
91 public void tearDown() {
92 if( executor != null ) {
93 executor.shutdownNow();
96 if(futureExecutor != null) {
97 futureExecutor.shutdownNow();
102 public void testTransactionIsolation() throws InterruptedException, ExecutionException {
104 assertNotNull(domBroker);
106 DOMDataReadTransaction readTx = domBroker.newReadOnlyTransaction();
107 assertNotNull(readTx);
109 DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
110 assertNotNull(writeTx);
113 * Writes /test in writeTx
116 writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
120 * Reads /test from writeTx Read should return container.
123 ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(OPERATIONAL,
124 TestModel.TEST_PATH);
125 assertTrue(writeTxContainer.get().isPresent());
129 * Reads /test from readTx Read should return Absent.
132 ListenableFuture<Optional<NormalizedNode<?, ?>>> readTxContainer = readTx
133 .read(OPERATIONAL, TestModel.TEST_PATH);
134 assertFalse(readTxContainer.get().isPresent());
138 public void testTransactionCommit() throws InterruptedException, ExecutionException {
140 DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
141 assertNotNull(writeTx);
144 * Writes /test in writeTx
147 writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
151 * Reads /test from writeTx Read should return container.
154 ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx.read(OPERATIONAL,
155 TestModel.TEST_PATH);
156 assertTrue(writeTxContainer.get().isPresent());
158 writeTx.submit().get();
160 Optional<NormalizedNode<?, ?>> afterCommitRead = domBroker.newReadOnlyTransaction()
161 .read(OPERATIONAL, TestModel.TEST_PATH).get();
162 assertTrue(afterCommitRead.isPresent());
165 @Test(expected=TransactionCommitFailedException.class)
166 public void testRejectedCommit() throws Exception {
168 commitExecutor.delegate = Mockito.mock( ExecutorService.class );
169 Mockito.doThrow( new RejectedExecutionException( "mock" ) )
170 .when( commitExecutor.delegate ).execute( Mockito.any( Runnable.class ) );
171 Mockito.doNothing().when( commitExecutor.delegate ).shutdown();
172 Mockito.doReturn( Collections.emptyList() ).when( commitExecutor.delegate ).shutdownNow();
173 Mockito.doReturn( "" ).when( commitExecutor.delegate ).toString();
174 Mockito.doReturn( true ).when( commitExecutor.delegate )
175 .awaitTermination( Mockito.anyLong(), Mockito.any( TimeUnit.class ) );
177 DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
178 writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME) );
180 writeTx.submit().checkedGet( 5, TimeUnit.SECONDS );
184 * Tests a simple DataChangeListener notification after a write.
187 public void testDataChangeListener() throws Throwable {
189 final NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode( TestModel.TEST_QNAME );
191 TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener();
193 domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
194 dcListener, DataChangeScope.BASE );
196 final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
197 assertNotNull( writeTx );
199 writeTx.put( OPERATIONAL, TestModel.TEST_PATH, testNode );
201 AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
203 dcListener.waitForChange();
205 if( caughtEx.get() != null ) {
206 throw caughtEx.get();
209 NormalizedNode<?, ?> actualNode = dcListener.change.getCreatedData().get( TestModel.TEST_PATH );
210 assertEquals( "Created node", testNode, actualNode );
214 * Tests a DataChangeListener that does an async submit of a write Tx in its onDataChanged method.
215 * This should succeed without deadlock.
218 public void testDataChangeListenerDoingAsyncWriteTxSubmit() throws Throwable {
220 final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
221 final CountDownLatch commitCompletedLatch = new CountDownLatch( 1 );
223 TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
225 public void onDataChanged( final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
227 DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
228 writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
229 ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
230 Futures.addCallback( writeTx.submit(), new FutureCallback<Void>() {
232 public void onSuccess( final Void result ) {
233 commitCompletedLatch.countDown();
237 public void onFailure( final Throwable t ) {
238 caughtCommitEx.set( t );
239 commitCompletedLatch.countDown();
243 super.onDataChanged( change );
247 domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
248 dcListener, DataChangeScope.BASE );
250 final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
251 assertNotNull( writeTx );
253 writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
255 AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
257 dcListener.waitForChange();
259 if( caughtEx.get() != null ) {
260 throw caughtEx.get();
263 assertTrue( "Commit Future was not invoked", commitCompletedLatch.await( 5, TimeUnit.SECONDS ) );
265 if( caughtCommitEx.get() != null ) {
266 throw caughtCommitEx.get();
271 * Tests a DataChangeListener that does a blocking submit of a write Tx in its onDataChanged method.
272 * This should throw an exception and not deadlock.
274 @Test(expected=TransactionCommitDeadlockException.class)
275 public void testDataChangeListenerDoingBlockingWriteTxSubmit() throws Throwable {
277 final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
279 TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
281 public void onDataChanged( final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
282 DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
283 writeTx.put( OPERATIONAL, TestModel.TEST2_PATH,
284 ImmutableNodes.containerNode( TestModel.TEST2_QNAME ) );
286 writeTx.submit().get();
287 } catch( ExecutionException e ) {
288 caughtCommitEx.set( e.getCause() );
289 } catch( Exception e ) {
290 caughtCommitEx.set( e );
293 super.onDataChanged( change );
298 domBroker.registerDataChangeListener( OPERATIONAL, TestModel.TEST_PATH,
299 dcListener, DataChangeScope.BASE );
301 final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
302 assertNotNull( writeTx );
304 writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode( TestModel.TEST_QNAME ) );
306 AtomicReference<Throwable> caughtEx = submitTxAsync( writeTx );
308 dcListener.waitForChange();
310 if( caughtEx.get() != null ) {
311 throw caughtEx.get();
314 if( caughtCommitEx.get() != null ) {
315 throw caughtCommitEx.get();
319 AtomicReference<Throwable> submitTxAsync( final DOMDataWriteTransaction writeTx ) {
320 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
327 } catch( Throwable e ) {
337 static class TestDOMDataChangeListener implements DOMDataChangeListener {
339 volatile AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change;
340 private final CountDownLatch latch = new CountDownLatch( 1 );
343 public void onDataChanged( final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change ) {
344 this.change = change;
348 void waitForChange() throws InterruptedException {
349 assertTrue( "onDataChanged was not called", latch.await( 5, TimeUnit.SECONDS ) );
353 static class CommitExecutorService extends ForwardingExecutorService {
355 ExecutorService delegate;
357 public CommitExecutorService( final ExecutorService delegate ) {
358 this.delegate = delegate;
362 protected ExecutorService delegate() {