2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.md.sal.dom.broker.impl;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
16 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
18 import com.google.common.base.Optional;
19 import com.google.common.collect.ImmutableMap;
20 import com.google.common.util.concurrent.ForwardingExecutorService;
21 import com.google.common.util.concurrent.FutureCallback;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.ListenableFuture;
24 import com.google.common.util.concurrent.ListeningExecutorService;
25 import com.google.common.util.concurrent.MoreExecutors;
26 import java.util.Collections;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.RejectedExecutionException;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.TimeoutException;
34 import java.util.concurrent.atomic.AtomicReference;
35 import org.junit.After;
36 import org.junit.Before;
37 import org.junit.Test;
38 import org.mockito.Mockito;
39 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
40 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
41 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
42 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException;
43 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
44 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
45 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
46 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
47 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
48 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
49 import org.opendaylight.controller.md.sal.dom.store.impl.TestModel;
50 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
51 import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
52 import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
53 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
54 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
55 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
56 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
58 public class DOMBrokerTest {
60 private SchemaContext schemaContext;
61 private AbstractDOMDataBroker domBroker;
62 private ListeningExecutorService executor;
63 private ExecutorService futureExecutor;
64 private CommitExecutorService commitExecutor;
67 public void setupStore() {
69 InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.newDirectExecutorService());
70 InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.newDirectExecutorService());
71 schemaContext = TestModel.createTestContext();
73 operStore.onGlobalContextUpdated(schemaContext);
74 configStore.onGlobalContextUpdated(schemaContext);
76 final ImmutableMap<LogicalDatastoreType, DOMStore> stores =
77 ImmutableMap.<LogicalDatastoreType, DOMStore>builder() //
78 .put(CONFIGURATION, configStore) //
79 .put(OPERATIONAL, operStore) //
82 commitExecutor = new CommitExecutorService(Executors.newSingleThreadExecutor());
83 futureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(1, 5, "FCB", DOMBrokerTest.class);
84 executor = new DeadlockDetectingListeningExecutorService(commitExecutor,
85 TransactionCommitDeadlockException
86 .DEADLOCK_EXCEPTION_SUPPLIER,
88 domBroker = new SerializedDOMDataBroker(stores, executor);
92 public void tearDown() {
93 if (executor != null) {
94 executor.shutdownNow();
97 if (futureExecutor != null) {
98 futureExecutor.shutdownNow();
102 @Test(timeout = 10000)
103 public void testTransactionIsolation() throws InterruptedException, ExecutionException {
105 assertNotNull(domBroker);
107 DOMDataReadTransaction readTx = domBroker.newReadOnlyTransaction();
108 assertNotNull(readTx);
110 DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
111 assertNotNull(writeTx);
114 * Writes /test in writeTx
117 writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
120 * Reads /test from writeTx Read should return container.
123 ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx
124 .read(OPERATIONAL, TestModel.TEST_PATH);
125 assertTrue(writeTxContainer.get().isPresent());
128 * Reads /test from readTx Read should return Absent.
131 ListenableFuture<Optional<NormalizedNode<?, ?>>> readTxContainer = readTx
132 .read(OPERATIONAL, TestModel.TEST_PATH);
133 assertFalse(readTxContainer.get().isPresent());
136 @Test(timeout = 10000)
137 public void testTransactionCommit() throws InterruptedException, ExecutionException, TimeoutException {
139 DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
140 assertNotNull(writeTx);
143 * Writes /test in writeTx
146 writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
149 * Reads /test from writeTx Read should return container.
152 ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx
153 .read(OPERATIONAL, TestModel.TEST_PATH);
154 assertTrue(writeTxContainer.get().isPresent());
156 writeTx.commit().get(5, TimeUnit.SECONDS);
158 Optional<NormalizedNode<?, ?>> afterCommitRead = domBroker.newReadOnlyTransaction()
159 .read(OPERATIONAL, TestModel.TEST_PATH).get();
160 assertTrue(afterCommitRead.isPresent());
163 @Test(timeout = 10000)
165 public void testTransactionSubmit() throws InterruptedException, ExecutionException, TimeoutException {
167 DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
168 assertNotNull(writeTx);
171 * Writes /test in writeTx
174 writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
177 * Reads /test from writeTx Read should return container.
180 ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx
181 .read(OPERATIONAL, TestModel.TEST_PATH);
182 assertTrue(writeTxContainer.get().isPresent());
184 writeTx.submit().get(5, TimeUnit.SECONDS);
186 Optional<NormalizedNode<?, ?>> afterCommitRead = domBroker.newReadOnlyTransaction()
187 .read(OPERATIONAL, TestModel.TEST_PATH).get();
188 assertTrue(afterCommitRead.isPresent());
191 @Test(expected = TransactionCommitFailedException.class)
192 @SuppressWarnings({"checkstyle:IllegalThrows", "checkstyle:AvoidHidingCauseException"})
193 public void testRejectedCommit() throws Throwable {
195 commitExecutor.delegate = Mockito.mock(ExecutorService.class);
196 Mockito.doThrow(new RejectedExecutionException("mock")).when(commitExecutor.delegate)
197 .execute(Mockito.any(Runnable.class));
198 Mockito.doNothing().when(commitExecutor.delegate).shutdown();
199 Mockito.doReturn(Collections.emptyList()).when(commitExecutor.delegate).shutdownNow();
200 Mockito.doReturn("").when(commitExecutor.delegate).toString();
201 Mockito.doReturn(true).when(commitExecutor.delegate)
202 .awaitTermination(Mockito.anyLong(), Mockito.any(TimeUnit.class));
204 DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
205 writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
208 writeTx.commit().get(5, TimeUnit.SECONDS);
209 } catch (ExecutionException e) {
215 * Tests a simple DataChangeListener notification after a write.
218 @SuppressWarnings("checkstyle:IllegalThrows")
219 public void testDataChangeListener() throws Throwable {
221 final NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
223 TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener();
225 domBroker.registerDataChangeListener(OPERATIONAL, TestModel.TEST_PATH, dcListener, DataChangeScope.BASE);
227 final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
228 assertNotNull(writeTx);
230 writeTx.put(OPERATIONAL, TestModel.TEST_PATH, testNode);
232 AtomicReference<Throwable> caughtEx = submitTxAsync(writeTx);
234 dcListener.waitForChange();
236 if (caughtEx.get() != null) {
237 throw caughtEx.get();
240 NormalizedNode<?, ?> actualNode = dcListener.capturedChange.getCreatedData().get(TestModel.TEST_PATH);
241 assertEquals("Created node", testNode, actualNode);
245 * Tests a DataChangeListener that does an async submit of a write Tx in its onDataChanged method.
246 * This should succeed without deadlock.
249 @SuppressWarnings("checkstyle:IllegalThrows")
250 public void testDataChangeListenerDoingAsyncWriteTxSubmit() throws Throwable {
252 final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
253 final CountDownLatch commitCompletedLatch = new CountDownLatch(1);
255 TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
257 public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
259 DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
260 writeTx.put(OPERATIONAL, TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME));
261 Futures.addCallback(writeTx.submit(), new FutureCallback<Void>() {
263 public void onSuccess(final Void result) {
264 commitCompletedLatch.countDown();
268 public void onFailure(final Throwable throwable) {
269 caughtCommitEx.set(throwable);
270 commitCompletedLatch.countDown();
272 }, MoreExecutors.directExecutor());
274 super.onDataChanged(change);
278 domBroker.registerDataChangeListener(OPERATIONAL, TestModel.TEST_PATH, dcListener, DataChangeScope.BASE);
280 final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
281 assertNotNull(writeTx);
283 writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
285 AtomicReference<Throwable> caughtEx = submitTxAsync(writeTx);
287 dcListener.waitForChange();
289 if (caughtEx.get() != null) {
290 throw caughtEx.get();
293 assertTrue("Commit Future was not invoked", commitCompletedLatch.await(5, TimeUnit.SECONDS));
295 if (caughtCommitEx.get() != null) {
296 throw caughtCommitEx.get();
301 * Tests a DataChangeListener that does a blocking submit of a write Tx in its onDataChanged method.
302 * This should throw an exception and not deadlock.
304 @Test(expected = TransactionCommitDeadlockException.class)
305 @SuppressWarnings({"checkstyle:IllegalThrows", "checkstyle:IllegalCatch"})
306 public void testDataChangeListenerDoingBlockingWriteTxSubmit() throws Throwable {
308 final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
310 TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
312 public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
313 DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
314 writeTx.put(OPERATIONAL, TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME));
316 writeTx.commit().get();
317 } catch (ExecutionException e) {
318 caughtCommitEx.set(e.getCause());
319 } catch (Exception e) {
320 caughtCommitEx.set(e);
322 super.onDataChanged(change);
327 domBroker.registerDataChangeListener(OPERATIONAL, TestModel.TEST_PATH, dcListener, DataChangeScope.BASE);
329 final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
330 assertNotNull(writeTx);
332 writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
334 AtomicReference<Throwable> caughtEx = submitTxAsync(writeTx);
336 dcListener.waitForChange();
338 if (caughtEx.get() != null) {
339 throw caughtEx.get();
342 if (caughtCommitEx.get() != null) {
343 throw caughtCommitEx.get();
347 @SuppressWarnings("checkstyle:IllegalCatch")
348 AtomicReference<Throwable> submitTxAsync(final DOMDataWriteTransaction writeTx) {
349 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
353 } catch (Throwable e) {
361 static class TestDOMDataChangeListener implements DOMDataChangeListener {
363 volatile AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> capturedChange;
364 private final CountDownLatch latch = new CountDownLatch(1);
367 public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
368 this.capturedChange = change;
372 void waitForChange() throws InterruptedException {
373 assertTrue("onDataChanged was not called", latch.await(5, TimeUnit.SECONDS));
377 static class CommitExecutorService extends ForwardingExecutorService {
379 ExecutorService delegate;
381 CommitExecutorService(final ExecutorService delegate) {
382 this.delegate = delegate;
386 protected ExecutorService delegate() {