2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.md.sal.dom.broker.impl;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
16 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
18 import com.google.common.base.Optional;
19 import com.google.common.collect.ImmutableMap;
20 import com.google.common.util.concurrent.ForwardingExecutorService;
21 import com.google.common.util.concurrent.FutureCallback;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.ListenableFuture;
24 import com.google.common.util.concurrent.ListeningExecutorService;
25 import com.google.common.util.concurrent.MoreExecutors;
26 import java.util.Collections;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.RejectedExecutionException;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicReference;
34 import org.junit.After;
35 import org.junit.Before;
36 import org.junit.Test;
37 import org.mockito.Mockito;
38 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
39 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
40 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
41 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException;
42 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
43 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
44 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
45 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
46 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
47 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
48 import org.opendaylight.controller.md.sal.dom.store.impl.TestModel;
49 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
50 import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
51 import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
52 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
53 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
54 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
55 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
57 public class DOMBrokerTest {
59 private SchemaContext schemaContext;
60 private AbstractDOMDataBroker domBroker;
61 private ListeningExecutorService executor;
62 private ExecutorService futureExecutor;
63 private CommitExecutorService commitExecutor;
66 public void setupStore() {
68 InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.newDirectExecutorService());
69 InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.newDirectExecutorService());
70 schemaContext = TestModel.createTestContext();
72 operStore.onGlobalContextUpdated(schemaContext);
73 configStore.onGlobalContextUpdated(schemaContext);
75 final ImmutableMap<LogicalDatastoreType, DOMStore> stores =
76 ImmutableMap.<LogicalDatastoreType, DOMStore>builder() //
77 .put(CONFIGURATION, configStore) //
78 .put(OPERATIONAL, operStore) //
81 commitExecutor = new CommitExecutorService(Executors.newSingleThreadExecutor());
82 futureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(1, 5, "FCB", DOMBrokerTest.class);
83 executor = new DeadlockDetectingListeningExecutorService(commitExecutor,
84 TransactionCommitDeadlockException
85 .DEADLOCK_EXCEPTION_SUPPLIER,
87 domBroker = new SerializedDOMDataBroker(stores, executor);
91 public void tearDown() {
92 if (executor != null) {
93 executor.shutdownNow();
96 if (futureExecutor != null) {
97 futureExecutor.shutdownNow();
101 @Test(timeout = 10000)
102 public void testTransactionIsolation() throws InterruptedException, ExecutionException {
104 assertNotNull(domBroker);
106 DOMDataReadTransaction readTx = domBroker.newReadOnlyTransaction();
107 assertNotNull(readTx);
109 DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
110 assertNotNull(writeTx);
113 * Writes /test in writeTx
116 writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
119 * Reads /test from writeTx Read should return container.
122 ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx
123 .read(OPERATIONAL, TestModel.TEST_PATH);
124 assertTrue(writeTxContainer.get().isPresent());
127 * Reads /test from readTx Read should return Absent.
130 ListenableFuture<Optional<NormalizedNode<?, ?>>> readTxContainer = readTx
131 .read(OPERATIONAL, TestModel.TEST_PATH);
132 assertFalse(readTxContainer.get().isPresent());
135 @Test(timeout = 10000)
136 public void testTransactionCommit() throws InterruptedException, ExecutionException {
138 DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
139 assertNotNull(writeTx);
142 * Writes /test in writeTx
145 writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
148 * Reads /test from writeTx Read should return container.
151 ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx
152 .read(OPERATIONAL, TestModel.TEST_PATH);
153 assertTrue(writeTxContainer.get().isPresent());
155 writeTx.submit().get();
157 Optional<NormalizedNode<?, ?>> afterCommitRead = domBroker.newReadOnlyTransaction()
158 .read(OPERATIONAL, TestModel.TEST_PATH).get();
159 assertTrue(afterCommitRead.isPresent());
162 @Test(expected = TransactionCommitFailedException.class)
163 public void testRejectedCommit() throws Exception {
165 commitExecutor.delegate = Mockito.mock(ExecutorService.class);
166 Mockito.doThrow(new RejectedExecutionException("mock")).when(commitExecutor.delegate)
167 .execute(Mockito.any(Runnable.class));
168 Mockito.doNothing().when(commitExecutor.delegate).shutdown();
169 Mockito.doReturn(Collections.emptyList()).when(commitExecutor.delegate).shutdownNow();
170 Mockito.doReturn("").when(commitExecutor.delegate).toString();
171 Mockito.doReturn(true).when(commitExecutor.delegate)
172 .awaitTermination(Mockito.anyLong(), Mockito.any(TimeUnit.class));
174 DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
175 writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
177 writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
181 * Tests a simple DataChangeListener notification after a write.
184 @SuppressWarnings("checkstyle:IllegalThrows")
185 public void testDataChangeListener() throws Throwable {
187 final NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
189 TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener();
191 domBroker.registerDataChangeListener(OPERATIONAL, TestModel.TEST_PATH, dcListener, DataChangeScope.BASE);
193 final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
194 assertNotNull(writeTx);
196 writeTx.put(OPERATIONAL, TestModel.TEST_PATH, testNode);
198 AtomicReference<Throwable> caughtEx = submitTxAsync(writeTx);
200 dcListener.waitForChange();
202 if (caughtEx.get() != null) {
203 throw caughtEx.get();
206 NormalizedNode<?, ?> actualNode = dcListener.capturedChange.getCreatedData().get(TestModel.TEST_PATH);
207 assertEquals("Created node", testNode, actualNode);
211 * Tests a DataChangeListener that does an async submit of a write Tx in its onDataChanged method.
212 * This should succeed without deadlock.
215 @SuppressWarnings("checkstyle:IllegalThrows")
216 public void testDataChangeListenerDoingAsyncWriteTxSubmit() throws Throwable {
218 final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
219 final CountDownLatch commitCompletedLatch = new CountDownLatch(1);
221 TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
223 public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
225 DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
226 writeTx.put(OPERATIONAL, TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME));
227 Futures.addCallback(writeTx.submit(), new FutureCallback<Void>() {
229 public void onSuccess(final Void result) {
230 commitCompletedLatch.countDown();
234 public void onFailure(final Throwable throwable) {
235 caughtCommitEx.set(throwable);
236 commitCompletedLatch.countDown();
240 super.onDataChanged(change);
244 domBroker.registerDataChangeListener(OPERATIONAL, TestModel.TEST_PATH, dcListener, DataChangeScope.BASE);
246 final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
247 assertNotNull(writeTx);
249 writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
251 AtomicReference<Throwable> caughtEx = submitTxAsync(writeTx);
253 dcListener.waitForChange();
255 if (caughtEx.get() != null) {
256 throw caughtEx.get();
259 assertTrue("Commit Future was not invoked", commitCompletedLatch.await(5, TimeUnit.SECONDS));
261 if (caughtCommitEx.get() != null) {
262 throw caughtCommitEx.get();
267 * Tests a DataChangeListener that does a blocking submit of a write Tx in its onDataChanged method.
268 * This should throw an exception and not deadlock.
270 @Test(expected = TransactionCommitDeadlockException.class)
271 @SuppressWarnings({"checkstyle:IllegalThrows", "checkstyle:IllegalCatch"})
272 public void testDataChangeListenerDoingBlockingWriteTxSubmit() throws Throwable {
274 final AtomicReference<Throwable> caughtCommitEx = new AtomicReference<>();
276 TestDOMDataChangeListener dcListener = new TestDOMDataChangeListener() {
278 public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
279 DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
280 writeTx.put(OPERATIONAL, TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME));
282 writeTx.submit().get();
283 } catch (ExecutionException e) {
284 caughtCommitEx.set(e.getCause());
285 } catch (Exception e) {
286 caughtCommitEx.set(e);
288 super.onDataChanged(change);
293 domBroker.registerDataChangeListener(OPERATIONAL, TestModel.TEST_PATH, dcListener, DataChangeScope.BASE);
295 final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
296 assertNotNull(writeTx);
298 writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
300 AtomicReference<Throwable> caughtEx = submitTxAsync(writeTx);
302 dcListener.waitForChange();
304 if (caughtEx.get() != null) {
305 throw caughtEx.get();
308 if (caughtCommitEx.get() != null) {
309 throw caughtCommitEx.get();
313 @SuppressWarnings("checkstyle:IllegalCatch")
314 AtomicReference<Throwable> submitTxAsync(final DOMDataWriteTransaction writeTx) {
315 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
319 } catch (Throwable e) {
327 static class TestDOMDataChangeListener implements DOMDataChangeListener {
329 volatile AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> capturedChange;
330 private final CountDownLatch latch = new CountDownLatch(1);
333 public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
334 this.capturedChange = change;
338 void waitForChange() throws InterruptedException {
339 assertTrue("onDataChanged was not called", latch.await(5, TimeUnit.SECONDS));
343 static class CommitExecutorService extends ForwardingExecutorService {
345 ExecutorService delegate;
347 CommitExecutorService(final ExecutorService delegate) {
348 this.delegate = delegate;
352 protected ExecutorService delegate() {