2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.md.sal.dom.broker.impl;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
15 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
17 import com.google.common.base.Optional;
18 import com.google.common.collect.ImmutableMap;
19 import com.google.common.util.concurrent.ForwardingExecutorService;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.ListeningExecutorService;
22 import com.google.common.util.concurrent.MoreExecutors;
23 import java.util.Collections;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.RejectedExecutionException;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.TimeoutException;
30 import java.util.concurrent.atomic.AtomicReference;
31 import org.junit.After;
32 import org.junit.Before;
33 import org.junit.Test;
34 import org.mockito.Mockito;
35 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
36 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException;
37 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
38 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
39 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
40 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
41 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
42 import org.opendaylight.controller.md.sal.dom.store.impl.TestModel;
43 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
44 import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
45 import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
46 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
47 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
48 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
50 public class DOMBrokerTest {
52 private SchemaContext schemaContext;
53 private AbstractDOMDataBroker domBroker;
54 private ListeningExecutorService executor;
55 private ExecutorService futureExecutor;
56 private CommitExecutorService commitExecutor;
59 public void setupStore() {
61 InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.newDirectExecutorService());
62 InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.newDirectExecutorService());
63 schemaContext = TestModel.createTestContext();
65 operStore.onGlobalContextUpdated(schemaContext);
66 configStore.onGlobalContextUpdated(schemaContext);
68 final ImmutableMap<LogicalDatastoreType, DOMStore> stores =
69 ImmutableMap.<LogicalDatastoreType, DOMStore>builder() //
70 .put(CONFIGURATION, configStore) //
71 .put(OPERATIONAL, operStore) //
74 commitExecutor = new CommitExecutorService(Executors.newSingleThreadExecutor());
75 futureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(1, 5, "FCB", DOMBrokerTest.class);
76 executor = new DeadlockDetectingListeningExecutorService(commitExecutor,
77 TransactionCommitDeadlockException
78 .DEADLOCK_EXCEPTION_SUPPLIER,
80 domBroker = new SerializedDOMDataBroker(stores, executor);
84 public void tearDown() {
85 if (executor != null) {
86 executor.shutdownNow();
89 if (futureExecutor != null) {
90 futureExecutor.shutdownNow();
94 @Test(timeout = 10000)
95 public void testTransactionIsolation() throws InterruptedException, ExecutionException {
97 assertNotNull(domBroker);
99 DOMDataReadTransaction readTx = domBroker.newReadOnlyTransaction();
100 assertNotNull(readTx);
102 DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
103 assertNotNull(writeTx);
106 * Writes /test in writeTx
109 writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
112 * Reads /test from writeTx Read should return container.
115 ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx
116 .read(OPERATIONAL, TestModel.TEST_PATH);
117 assertTrue(writeTxContainer.get().isPresent());
120 * Reads /test from readTx Read should return Absent.
123 ListenableFuture<Optional<NormalizedNode<?, ?>>> readTxContainer = readTx
124 .read(OPERATIONAL, TestModel.TEST_PATH);
125 assertFalse(readTxContainer.get().isPresent());
128 @Test(timeout = 10000)
129 public void testTransactionCommit() throws InterruptedException, ExecutionException, TimeoutException {
131 DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
132 assertNotNull(writeTx);
135 * Writes /test in writeTx
138 writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
141 * Reads /test from writeTx Read should return container.
144 ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx
145 .read(OPERATIONAL, TestModel.TEST_PATH);
146 assertTrue(writeTxContainer.get().isPresent());
148 writeTx.commit().get(5, TimeUnit.SECONDS);
150 Optional<NormalizedNode<?, ?>> afterCommitRead = domBroker.newReadOnlyTransaction()
151 .read(OPERATIONAL, TestModel.TEST_PATH).get();
152 assertTrue(afterCommitRead.isPresent());
155 @Test(timeout = 10000)
157 public void testTransactionSubmit() throws InterruptedException, ExecutionException, TimeoutException {
159 DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
160 assertNotNull(writeTx);
163 * Writes /test in writeTx
166 writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
169 * Reads /test from writeTx Read should return container.
172 ListenableFuture<Optional<NormalizedNode<?, ?>>> writeTxContainer = writeTx
173 .read(OPERATIONAL, TestModel.TEST_PATH);
174 assertTrue(writeTxContainer.get().isPresent());
176 writeTx.submit().get(5, TimeUnit.SECONDS);
178 Optional<NormalizedNode<?, ?>> afterCommitRead = domBroker.newReadOnlyTransaction()
179 .read(OPERATIONAL, TestModel.TEST_PATH).get();
180 assertTrue(afterCommitRead.isPresent());
183 @Test(expected = TransactionCommitFailedException.class)
184 @SuppressWarnings({"checkstyle:IllegalThrows", "checkstyle:AvoidHidingCauseException"})
185 public void testRejectedCommit() throws Throwable {
187 commitExecutor.delegate = Mockito.mock(ExecutorService.class);
188 Mockito.doThrow(new RejectedExecutionException("mock")).when(commitExecutor.delegate)
189 .execute(Mockito.any(Runnable.class));
190 Mockito.doNothing().when(commitExecutor.delegate).shutdown();
191 Mockito.doReturn(Collections.emptyList()).when(commitExecutor.delegate).shutdownNow();
192 Mockito.doReturn("").when(commitExecutor.delegate).toString();
193 Mockito.doReturn(true).when(commitExecutor.delegate)
194 .awaitTermination(Mockito.anyLong(), Mockito.any(TimeUnit.class));
196 DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
197 writeTx.put(OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
200 writeTx.commit().get(5, TimeUnit.SECONDS);
201 } catch (ExecutionException e) {
206 @SuppressWarnings("checkstyle:IllegalCatch")
207 AtomicReference<Throwable> submitTxAsync(final DOMDataWriteTransaction writeTx) {
208 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
212 } catch (Throwable e) {
221 static class CommitExecutorService extends ForwardingExecutorService {
223 ExecutorService delegate;
225 CommitExecutorService(final ExecutorService delegate) {
226 this.delegate = delegate;
230 protected ExecutorService delegate() {