2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertSame;
13 import static org.junit.Assert.assertTrue;
14 import static org.junit.Assert.fail;
15 import static org.mockito.Mockito.doAnswer;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.inOrder;
18 import static org.mockito.Mockito.mock;
19 import static org.mockito.Mockito.never;
20 import static org.mockito.Mockito.times;
21 import static org.mockito.Mockito.verify;
22 import static org.opendaylight.yangtools.util.concurrent.FluentFutures.immediateFalseFluentFuture;
23 import static org.opendaylight.yangtools.util.concurrent.FluentFutures.immediateNullFluentFuture;
24 import static org.opendaylight.yangtools.util.concurrent.FluentFutures.immediateTrueFluentFuture;
26 import com.google.common.base.Throwables;
27 import com.google.common.collect.ClassToInstanceMap;
28 import com.google.common.collect.ImmutableMap;
29 import com.google.common.util.concurrent.FluentFuture;
30 import com.google.common.util.concurrent.FutureCallback;
31 import com.google.common.util.concurrent.Futures;
32 import com.google.common.util.concurrent.ListenableFuture;
33 import com.google.common.util.concurrent.MoreExecutors;
34 import com.google.common.util.concurrent.SettableFuture;
35 import com.google.common.util.concurrent.Uninterruptibles;
36 import java.util.ArrayList;
37 import java.util.Arrays;
38 import java.util.Collection;
39 import java.util.List;
40 import java.util.concurrent.CountDownLatch;
41 import java.util.concurrent.ExecutionException;
42 import java.util.concurrent.SynchronousQueue;
43 import java.util.concurrent.ThreadPoolExecutor;
44 import java.util.concurrent.TimeUnit;
45 import java.util.concurrent.TimeoutException;
46 import java.util.concurrent.atomic.AtomicReference;
47 import org.junit.After;
48 import org.junit.Before;
49 import org.junit.Test;
50 import org.mockito.InOrder;
51 import org.mockito.stubbing.Answer;
52 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
53 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
54 import org.opendaylight.mdsal.common.api.CommitInfo;
55 import org.opendaylight.mdsal.common.api.DataStoreUnavailableException;
56 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
57 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
58 import org.opendaylight.mdsal.dom.api.DOMDataBrokerExtension;
59 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
60 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
61 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistry;
62 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
63 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
64 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
65 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
66 import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
67 import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
68 import org.opendaylight.mdsal.dom.broker.TransactionCommitFailedExceptionMapper;
69 import org.opendaylight.mdsal.dom.spi.store.DOMStore;
70 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
71 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
72 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
73 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
74 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
75 import org.opendaylight.mdsal.dom.store.inmemory.InMemoryDOMDataStore;
76 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
77 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
80 * Unit tests for DOMConcurrentDataCommitCoordinator.
82 * @author Thomas Pantelis
84 public class ConcurrentDOMDataBrokerTest {
86 private final DOMDataTreeWriteTransaction transaction = mock(DOMDataTreeWriteTransaction.class);
87 private final DOMStoreThreePhaseCommitCohort mockCohort1 = mock(DOMStoreThreePhaseCommitCohort.class);
88 private final DOMStoreThreePhaseCommitCohort mockCohort2 = mock(DOMStoreThreePhaseCommitCohort.class);
89 private final ThreadPoolExecutor futureExecutor =
90 new ThreadPoolExecutor(0, 1, 5, TimeUnit.SECONDS, new SynchronousQueue<>());
91 private ConcurrentDOMDataBroker coordinator;
95 doReturn("tx").when(transaction).getIdentifier();
97 DOMStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.newDirectExecutorService());
99 coordinator = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL, store),
104 public void tearDown() {
105 futureExecutor.shutdownNow();
109 public void testSuccessfulSubmitAsync() throws Exception {
110 testSuccessfulSubmit(true);
114 public void testSuccessfulSubmitSync() throws Exception {
115 testSuccessfulSubmit(false);
118 private void testSuccessfulSubmit(final boolean doAsync) throws InterruptedException {
119 final CountDownLatch asyncCanCommitContinue = new CountDownLatch(1);
120 Answer<ListenableFuture<Boolean>> asyncCanCommit = invocation -> {
121 final SettableFuture<Boolean> future = SettableFuture.create();
124 Uninterruptibles.awaitUninterruptibly(asyncCanCommitContinue,
125 10, TimeUnit.SECONDS);
126 future.set(Boolean.TRUE);
129 future.set(Boolean.TRUE);
135 doAnswer(asyncCanCommit).when(mockCohort1).canCommit();
136 doReturn(immediateNullFluentFuture()).when(mockCohort1).preCommit();
137 doReturn(immediateNullFluentFuture()).when(mockCohort1).commit();
139 doReturn(immediateTrueFluentFuture()).when(mockCohort2).canCommit();
140 doReturn(immediateNullFluentFuture()).when(mockCohort2).preCommit();
141 doReturn(immediateNullFluentFuture()).when(mockCohort2).commit();
143 ListenableFuture<? extends CommitInfo> future =
144 coordinator.commit(transaction, Arrays.asList(mockCohort1, mockCohort2));
146 final CountDownLatch doneLatch = new CountDownLatch(1);
147 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
148 Futures.addCallback(future, new FutureCallback<CommitInfo>() {
150 public void onSuccess(final CommitInfo result) {
151 doneLatch.countDown();
155 public void onFailure(final Throwable failure) {
156 caughtEx.set(failure);
157 doneLatch.countDown();
159 }, MoreExecutors.directExecutor());
161 asyncCanCommitContinue.countDown();
163 assertTrue("Submit complete", doneLatch.await(5, TimeUnit.SECONDS));
165 if (caughtEx.get() != null) {
166 Throwables.throwIfUnchecked(caughtEx.get());
167 throw new RuntimeException(caughtEx.get());
170 assertEquals("Task count", doAsync ? 1 : 0, futureExecutor.getTaskCount());
172 InOrder inOrder = inOrder(mockCohort1, mockCohort2);
173 inOrder.verify(mockCohort1).canCommit();
174 inOrder.verify(mockCohort2).canCommit();
175 inOrder.verify(mockCohort1).preCommit();
176 inOrder.verify(mockCohort2).preCommit();
177 inOrder.verify(mockCohort1).commit();
178 inOrder.verify(mockCohort2).commit();
182 public void testSubmitWithNegativeCanCommitResponse() throws Exception {
183 doReturn(immediateTrueFluentFuture()).when(mockCohort1).canCommit();
184 doReturn(immediateNullFluentFuture()).when(mockCohort1).abort();
186 doReturn(Futures.immediateFuture(Boolean.FALSE)).when(mockCohort2).canCommit();
187 doReturn(immediateNullFluentFuture()).when(mockCohort2).abort();
189 DOMStoreThreePhaseCommitCohort mockCohort3 = mock(DOMStoreThreePhaseCommitCohort.class);
190 doReturn(Futures.immediateFuture(Boolean.FALSE)).when(mockCohort3).canCommit();
191 doReturn(immediateNullFluentFuture()).when(mockCohort3).abort();
193 ListenableFuture<? extends CommitInfo> future = coordinator.commit(
194 transaction, Arrays.asList(mockCohort1, mockCohort2, mockCohort3));
196 assertFailure(future, null, mockCohort1, mockCohort2, mockCohort3);
199 private static void assertFailure(final ListenableFuture<?> future, final Exception expCause,
200 final DOMStoreThreePhaseCommitCohort... mockCohorts) throws Exception {
202 future.get(5, TimeUnit.SECONDS);
203 fail("Expected TransactionCommitFailedException");
204 } catch (ExecutionException e) {
205 TransactionCommitFailedException tcf = TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e);
206 if (expCause != null) {
207 assertSame("Expected cause", expCause.getClass(), tcf.getCause().getClass());
210 InOrder inOrder = inOrder((Object[])mockCohorts);
211 for (DOMStoreThreePhaseCommitCohort c: mockCohorts) {
212 inOrder.verify(c).abort();
214 } catch (TimeoutException e) {
220 public void testSubmitWithCanCommitException() throws Exception {
221 doReturn(immediateTrueFluentFuture()).when(mockCohort1).canCommit();
222 doReturn(immediateNullFluentFuture()).when(mockCohort1).abort();
224 IllegalStateException cause = new IllegalStateException("mock");
225 doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).canCommit();
226 doReturn(immediateNullFluentFuture()).when(mockCohort2).abort();
228 FluentFuture<? extends CommitInfo> future = coordinator.commit(
229 transaction, Arrays.asList(mockCohort1, mockCohort2));
231 assertFailure(future, cause, mockCohort1, mockCohort2);
235 public void testSubmitWithCanCommitDataStoreUnavailableException() throws Exception {
236 doReturn(immediateTrueFluentFuture()).when(mockCohort1).canCommit();
237 doReturn(immediateNullFluentFuture()).when(mockCohort1).abort();
238 NoShardLeaderException rootCause = new NoShardLeaderException("mock");
239 DataStoreUnavailableException cause = new DataStoreUnavailableException(rootCause.getMessage(), rootCause);
240 doReturn(Futures.immediateFailedFuture(rootCause)).when(mockCohort2).canCommit();
241 doReturn(immediateNullFluentFuture()).when(mockCohort2).abort();
243 FluentFuture<? extends CommitInfo> future = coordinator.commit(
244 transaction, Arrays.asList(mockCohort1, mockCohort2));
246 assertFailure(future, cause, mockCohort1, mockCohort2);
250 public void testSubmitWithPreCommitException() throws Exception {
251 doReturn(immediateTrueFluentFuture()).when(mockCohort1).canCommit();
252 doReturn(immediateNullFluentFuture()).when(mockCohort1).preCommit();
253 doReturn(immediateNullFluentFuture()).when(mockCohort1).abort();
255 doReturn(immediateTrueFluentFuture()).when(mockCohort2).canCommit();
256 IllegalStateException cause = new IllegalStateException("mock");
257 doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).preCommit();
258 doReturn(immediateNullFluentFuture()).when(mockCohort2).abort();
260 DOMStoreThreePhaseCommitCohort mockCohort3 = mock(DOMStoreThreePhaseCommitCohort.class);
261 doReturn(immediateTrueFluentFuture()).when(mockCohort3).canCommit();
262 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock2")))
263 .when(mockCohort3).preCommit();
264 doReturn(immediateNullFluentFuture()).when(mockCohort3).abort();
266 FluentFuture<? extends CommitInfo> future = coordinator.commit(
267 transaction, Arrays.asList(mockCohort1, mockCohort2, mockCohort3));
269 assertFailure(future, cause, mockCohort1, mockCohort2, mockCohort3);
273 public void testSubmitWithCommitException() throws Exception {
274 doReturn(immediateTrueFluentFuture()).when(mockCohort1).canCommit();
275 doReturn(immediateNullFluentFuture()).when(mockCohort1).preCommit();
276 doReturn(immediateNullFluentFuture()).when(mockCohort1).commit();
277 doReturn(immediateNullFluentFuture()).when(mockCohort1).abort();
279 doReturn(immediateTrueFluentFuture()).when(mockCohort2).canCommit();
280 doReturn(immediateNullFluentFuture()).when(mockCohort2).preCommit();
281 IllegalStateException cause = new IllegalStateException("mock");
282 doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).commit();
283 doReturn(immediateNullFluentFuture()).when(mockCohort2).abort();
285 DOMStoreThreePhaseCommitCohort mockCohort3 = mock(DOMStoreThreePhaseCommitCohort.class);
286 doReturn(immediateTrueFluentFuture()).when(mockCohort3).canCommit();
287 doReturn(immediateNullFluentFuture()).when(mockCohort3).preCommit();
288 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock2")))
289 .when(mockCohort3).commit();
290 doReturn(immediateNullFluentFuture()).when(mockCohort3).abort();
292 FluentFuture<? extends CommitInfo> future = coordinator.commit(
293 transaction, Arrays.asList(mockCohort1, mockCohort2, mockCohort3));
295 assertFailure(future, cause, mockCohort1, mockCohort2, mockCohort3);
299 public void testSubmitWithAbortException() throws Exception {
300 doReturn(immediateTrueFluentFuture()).when(mockCohort1).canCommit();
301 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock abort error")))
302 .when(mockCohort1).abort();
304 IllegalStateException cause = new IllegalStateException("mock canCommit error");
305 doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).canCommit();
306 doReturn(immediateNullFluentFuture()).when(mockCohort2).abort();
308 FluentFuture<? extends CommitInfo> future = coordinator.commit(
309 transaction, Arrays.asList(mockCohort1, mockCohort2));
311 assertFailure(future, cause, mockCohort1, mockCohort2);
315 public void testCreateReadWriteTransaction() {
316 DOMStore domStore = mock(DOMStore.class);
317 try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
318 LogicalDatastoreType.OPERATIONAL, domStore, LogicalDatastoreType.CONFIGURATION, domStore),
320 dataBroker.newReadWriteTransaction();
322 verify(domStore, never()).newReadWriteTransaction();
327 public void testCreateWriteOnlyTransaction() {
328 DOMStore domStore = mock(DOMStore.class);
329 try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
330 LogicalDatastoreType.OPERATIONAL, domStore, LogicalDatastoreType.CONFIGURATION, domStore),
332 dataBroker.newWriteOnlyTransaction();
334 verify(domStore, never()).newWriteOnlyTransaction();
339 public void testCreateReadOnlyTransaction() {
340 DOMStore domStore = mock(DOMStore.class);
341 try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
342 LogicalDatastoreType.OPERATIONAL, domStore, LogicalDatastoreType.CONFIGURATION, domStore),
344 dataBroker.newReadOnlyTransaction();
346 verify(domStore, never()).newReadOnlyTransaction();
351 public void testLazySubTransactionCreationForReadWriteTransactions() {
352 DOMStore configDomStore = mock(DOMStore.class);
353 DOMStore operationalDomStore = mock(DOMStore.class);
354 DOMStoreReadWriteTransaction storeTxn = mock(DOMStoreReadWriteTransaction.class);
356 doReturn(storeTxn).when(operationalDomStore).newReadWriteTransaction();
357 doReturn(storeTxn).when(configDomStore).newReadWriteTransaction();
359 try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
360 LogicalDatastoreType.OPERATIONAL, operationalDomStore, LogicalDatastoreType.CONFIGURATION,
361 configDomStore), futureExecutor)) {
362 DOMDataTreeReadWriteTransaction dataTxn = dataBroker.newReadWriteTransaction();
364 dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.empty(), mock(NormalizedNode.class));
365 dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.empty(), mock(NormalizedNode.class));
366 dataTxn.read(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.empty());
368 verify(configDomStore, never()).newReadWriteTransaction();
369 verify(operationalDomStore, times(1)).newReadWriteTransaction();
371 dataTxn.put(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.empty(), mock(NormalizedNode.class));
373 verify(configDomStore, times(1)).newReadWriteTransaction();
374 verify(operationalDomStore, times(1)).newReadWriteTransaction();
380 public void testLazySubTransactionCreationForWriteOnlyTransactions() {
381 DOMStore configDomStore = mock(DOMStore.class);
382 DOMStore operationalDomStore = mock(DOMStore.class);
383 DOMStoreWriteTransaction storeTxn = mock(DOMStoreWriteTransaction.class);
385 doReturn(storeTxn).when(operationalDomStore).newWriteOnlyTransaction();
386 doReturn(storeTxn).when(configDomStore).newWriteOnlyTransaction();
388 try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
389 LogicalDatastoreType.OPERATIONAL, operationalDomStore, LogicalDatastoreType.CONFIGURATION,
390 configDomStore), futureExecutor)) {
391 DOMDataTreeWriteTransaction dataTxn = dataBroker.newWriteOnlyTransaction();
393 dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.empty(), mock(NormalizedNode.class));
394 dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.empty(), mock(NormalizedNode.class));
396 verify(configDomStore, never()).newWriteOnlyTransaction();
397 verify(operationalDomStore, times(1)).newWriteOnlyTransaction();
399 dataTxn.put(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.empty(), mock(NormalizedNode.class));
401 verify(configDomStore, times(1)).newWriteOnlyTransaction();
402 verify(operationalDomStore, times(1)).newWriteOnlyTransaction();
407 public void testLazySubTransactionCreationForReadOnlyTransactions() {
408 DOMStore configDomStore = mock(DOMStore.class);
409 DOMStore operationalDomStore = mock(DOMStore.class);
410 DOMStoreReadTransaction storeTxn = mock(DOMStoreReadTransaction.class);
412 doReturn(storeTxn).when(operationalDomStore).newReadOnlyTransaction();
413 doReturn(storeTxn).when(configDomStore).newReadOnlyTransaction();
415 try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
416 LogicalDatastoreType.OPERATIONAL, operationalDomStore, LogicalDatastoreType.CONFIGURATION,
417 configDomStore), futureExecutor)) {
418 DOMDataTreeReadTransaction dataTxn = dataBroker.newReadOnlyTransaction();
420 dataTxn.read(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.empty());
421 dataTxn.read(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.empty());
423 verify(configDomStore, never()).newReadOnlyTransaction();
424 verify(operationalDomStore, times(1)).newReadOnlyTransaction();
426 dataTxn.read(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.empty());
428 verify(configDomStore, times(1)).newReadOnlyTransaction();
429 verify(operationalDomStore, times(1)).newReadOnlyTransaction();
434 public void testSubmitWithOnlyOneSubTransaction() throws InterruptedException {
435 DOMStore configDomStore = mock(DOMStore.class);
436 DOMStore operationalDomStore = mock(DOMStore.class);
437 DOMStoreReadWriteTransaction mockStoreReadWriteTransaction = mock(DOMStoreReadWriteTransaction.class);
438 DOMStoreThreePhaseCommitCohort mockCohort = mock(DOMStoreThreePhaseCommitCohort.class);
440 doReturn(mockStoreReadWriteTransaction).when(operationalDomStore).newReadWriteTransaction();
441 doReturn(mockCohort).when(mockStoreReadWriteTransaction).ready();
442 doReturn(immediateFalseFluentFuture()).when(mockCohort).canCommit();
443 doReturn(immediateNullFluentFuture()).when(mockCohort).abort();
445 final CountDownLatch latch = new CountDownLatch(1);
446 final List<DOMStoreThreePhaseCommitCohort> commitCohorts = new ArrayList<>();
448 try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
449 LogicalDatastoreType.OPERATIONAL, operationalDomStore, LogicalDatastoreType.CONFIGURATION,
450 configDomStore), futureExecutor) {
452 public FluentFuture<? extends CommitInfo> commit(DOMDataTreeWriteTransaction writeTx,
453 Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
454 commitCohorts.addAll(cohorts);
456 return super.commit(writeTx, cohorts);
459 DOMDataTreeReadWriteTransaction domDataReadWriteTransaction = dataBroker.newReadWriteTransaction();
461 domDataReadWriteTransaction.delete(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.empty());
463 domDataReadWriteTransaction.commit();
465 assertTrue(latch.await(10, TimeUnit.SECONDS));
467 assertTrue(commitCohorts.size() == 1);
472 public void testSubmitWithOnlyTwoSubTransactions() throws InterruptedException {
473 DOMStore configDomStore = mock(DOMStore.class);
474 DOMStore operationalDomStore = mock(DOMStore.class);
475 DOMStoreReadWriteTransaction operationalTransaction = mock(DOMStoreReadWriteTransaction.class);
476 DOMStoreReadWriteTransaction configTransaction = mock(DOMStoreReadWriteTransaction.class);
477 DOMStoreThreePhaseCommitCohort mockCohortOperational = mock(DOMStoreThreePhaseCommitCohort.class);
478 DOMStoreThreePhaseCommitCohort mockCohortConfig = mock(DOMStoreThreePhaseCommitCohort.class);
480 doReturn(operationalTransaction).when(operationalDomStore).newReadWriteTransaction();
481 doReturn(configTransaction).when(configDomStore).newReadWriteTransaction();
483 doReturn(mockCohortOperational).when(operationalTransaction).ready();
484 doReturn(immediateFalseFluentFuture()).when(mockCohortOperational).canCommit();
485 doReturn(immediateNullFluentFuture()).when(mockCohortOperational).abort();
487 doReturn(mockCohortConfig).when(configTransaction).ready();
488 doReturn(immediateFalseFluentFuture()).when(mockCohortConfig).canCommit();
489 doReturn(immediateNullFluentFuture()).when(mockCohortConfig).abort();
491 final CountDownLatch latch = new CountDownLatch(1);
492 final List<DOMStoreThreePhaseCommitCohort> commitCohorts = new ArrayList<>();
494 try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
495 LogicalDatastoreType.OPERATIONAL, operationalDomStore, LogicalDatastoreType.CONFIGURATION,
496 configDomStore), futureExecutor) {
498 @SuppressWarnings("checkstyle:hiddenField")
499 public FluentFuture<? extends CommitInfo> commit(DOMDataTreeWriteTransaction writeTx,
500 Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
501 commitCohorts.addAll(cohorts);
503 return super.commit(writeTx, cohorts);
506 DOMDataTreeReadWriteTransaction domDataReadWriteTransaction = dataBroker.newReadWriteTransaction();
508 domDataReadWriteTransaction.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.empty(),
509 mock(NormalizedNode.class));
510 domDataReadWriteTransaction.merge(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.empty(),
511 mock(NormalizedNode.class));
513 domDataReadWriteTransaction.commit();
515 assertTrue(latch.await(10, TimeUnit.SECONDS));
517 assertTrue(commitCohorts.size() == 2);
522 public void testCreateTransactionChain() {
523 DOMStore domStore = mock(DOMStore.class);
524 try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
525 LogicalDatastoreType.OPERATIONAL, domStore, LogicalDatastoreType.CONFIGURATION, domStore),
528 dataBroker.createTransactionChain(mock(DOMTransactionChainListener.class));
530 verify(domStore, times(2)).createTransactionChain();
536 public void testCreateTransactionOnChain() {
537 DOMStore domStore = mock(DOMStore.class);
538 try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
539 LogicalDatastoreType.OPERATIONAL, domStore, LogicalDatastoreType.CONFIGURATION, domStore),
542 DOMStoreReadWriteTransaction operationalTransaction = mock(DOMStoreReadWriteTransaction.class);
543 DOMStoreTransactionChain mockChain = mock(DOMStoreTransactionChain.class);
545 doReturn(mockChain).when(domStore).createTransactionChain();
546 doReturn(operationalTransaction).when(mockChain).newWriteOnlyTransaction();
548 DOMTransactionChain transactionChain = dataBroker.createTransactionChain(
549 mock(DOMTransactionChainListener.class));
551 DOMDataTreeWriteTransaction domDataWriteTransaction = transactionChain.newWriteOnlyTransaction();
553 verify(mockChain, never()).newWriteOnlyTransaction();
555 domDataWriteTransaction.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.empty(),
556 mock(NormalizedNode.class));
561 public void testEmptyTransactionSubmitSucceeds() throws ExecutionException, InterruptedException {
562 DOMStore domStore = mock(DOMStore.class);
563 try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
564 LogicalDatastoreType.OPERATIONAL, domStore, LogicalDatastoreType.CONFIGURATION, domStore),
567 FluentFuture<? extends CommitInfo> submit1 = dataBroker.newWriteOnlyTransaction().commit();
569 assertNotNull(submit1);
573 FluentFuture<? extends CommitInfo> submit2 = dataBroker.newReadWriteTransaction().commit();
575 assertNotNull(submit2);
582 public void testExtensions() {
583 DistributedDataStore mockConfigStore = mock(DistributedDataStore.class);
584 DistributedDataStore mockOperStore = mock(DistributedDataStore.class);
585 try (ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(
586 LogicalDatastoreType.OPERATIONAL, mockOperStore,
587 LogicalDatastoreType.CONFIGURATION, mockConfigStore), futureExecutor)) {
589 ClassToInstanceMap<DOMDataBrokerExtension> supportedExtensions = dataBroker.getExtensions();
590 assertNotNull(supportedExtensions.getInstance(DOMDataTreeChangeService.class));
592 DOMDataTreeCommitCohortRegistry cohortRegistry = supportedExtensions.getInstance(
593 DOMDataTreeCommitCohortRegistry.class);
594 assertNotNull(cohortRegistry);
596 DOMDataTreeCommitCohort mockCohort = mock(DOMDataTreeCommitCohort.class);
597 DOMDataTreeIdentifier path = new DOMDataTreeIdentifier(
598 org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION,
599 YangInstanceIdentifier.empty());
600 cohortRegistry.registerCommitCohort(path, mockCohort);
602 verify(mockConfigStore).registerCommitCohort(path, mockCohort);