2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertSame;
12 import static org.junit.Assert.fail;
13 import static org.mockito.Mockito.doAnswer;
14 import static org.mockito.Mockito.doReturn;
15 import static org.mockito.Mockito.inOrder;
16 import static org.mockito.Mockito.mock;
17 import com.google.common.collect.ImmutableMap;
18 import com.google.common.util.concurrent.CheckedFuture;
19 import com.google.common.util.concurrent.FutureCallback;
20 import com.google.common.util.concurrent.Futures;
21 import com.google.common.util.concurrent.ListenableFuture;
22 import com.google.common.util.concurrent.MoreExecutors;
23 import com.google.common.util.concurrent.SettableFuture;
24 import com.google.common.util.concurrent.Uninterruptibles;
25 import java.util.Arrays;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.SynchronousQueue;
28 import java.util.concurrent.ThreadPoolExecutor;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.TimeoutException;
31 import java.util.concurrent.atomic.AtomicReference;
32 import org.junit.After;
33 import org.junit.Before;
34 import org.junit.Test;
35 import org.mockito.InOrder;
36 import org.mockito.invocation.InvocationOnMock;
37 import org.mockito.stubbing.Answer;
38 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
39 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
40 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
41 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
42 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
43 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
46 * Unit tests for DOMConcurrentDataCommitCoordinator.
48 * @author Thomas Pantelis
50 public class ConcurrentDOMDataBrokerTest {
52 private final DOMDataWriteTransaction transaction = mock(DOMDataWriteTransaction.class);
53 private final DOMStoreThreePhaseCommitCohort mockCohort1 = mock(DOMStoreThreePhaseCommitCohort.class);
54 private final DOMStoreThreePhaseCommitCohort mockCohort2 = mock(DOMStoreThreePhaseCommitCohort.class);
55 private final ThreadPoolExecutor futureExecutor =
56 new ThreadPoolExecutor(0, 1, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
57 private ConcurrentDOMDataBroker coordinator;
61 doReturn("tx").when(transaction).getIdentifier();
63 DOMStore store = new InMemoryDOMDataStore("OPER",
64 MoreExecutors.sameThreadExecutor());
66 coordinator = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL, store), futureExecutor);
70 public void tearDown() {
71 futureExecutor.shutdownNow();
75 public void testSuccessfulSubmitAsync() throws Throwable {
76 testSuccessfulSubmit(true);
80 public void testSuccessfulSubmitSync() throws Throwable {
81 testSuccessfulSubmit(false);
84 private void testSuccessfulSubmit(final boolean doAsync) throws Throwable {
85 final CountDownLatch asyncCanCommitContinue = new CountDownLatch(1);
86 Answer<ListenableFuture<Boolean>> asyncCanCommit = new Answer<ListenableFuture<Boolean>>() {
88 public ListenableFuture<Boolean> answer(final InvocationOnMock invocation) {
89 final SettableFuture<Boolean> future = SettableFuture.create();
94 Uninterruptibles.awaitUninterruptibly(asyncCanCommitContinue,
95 10, TimeUnit.SECONDS);
107 doAnswer(asyncCanCommit).when(mockCohort1).canCommit();
108 doReturn(Futures.immediateFuture(null)).when(mockCohort1).preCommit();
109 doReturn(Futures.immediateFuture(null)).when(mockCohort1).commit();
111 doReturn(Futures.immediateFuture(true)).when(mockCohort2).canCommit();
112 doReturn(Futures.immediateFuture(null)).when(mockCohort2).preCommit();
113 doReturn(Futures.immediateFuture(null)).when(mockCohort2).commit();
115 CheckedFuture<Void, TransactionCommitFailedException> future = coordinator.submit(
116 transaction, Arrays.asList(mockCohort1, mockCohort2));
118 final CountDownLatch doneLatch = new CountDownLatch(1);
119 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
120 Futures.addCallback(future, new FutureCallback<Void>() {
122 public void onSuccess(final Void result) {
123 doneLatch.countDown();
127 public void onFailure(final Throwable t) {
129 doneLatch.countDown();
133 asyncCanCommitContinue.countDown();
135 assertEquals("Submit complete", true, doneLatch.await(5, TimeUnit.SECONDS));
137 if(caughtEx.get() != null) {
138 throw caughtEx.get();
141 assertEquals("Task count", doAsync ? 1 : 0, futureExecutor.getTaskCount());
143 InOrder inOrder = inOrder(mockCohort1, mockCohort2);
144 inOrder.verify(mockCohort1).canCommit();
145 inOrder.verify(mockCohort2).canCommit();
146 inOrder.verify(mockCohort1).preCommit();
147 inOrder.verify(mockCohort2).preCommit();
148 inOrder.verify(mockCohort1).commit();
149 inOrder.verify(mockCohort2).commit();
153 public void testSubmitWithNegativeCanCommitResponse() throws Exception {
154 doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit();
155 doReturn(Futures.immediateFuture(null)).when(mockCohort1).abort();
157 doReturn(Futures.immediateFuture(false)).when(mockCohort2).canCommit();
158 doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort();
160 DOMStoreThreePhaseCommitCohort mockCohort3 = mock(DOMStoreThreePhaseCommitCohort.class);
161 doReturn(Futures.immediateFuture(false)).when(mockCohort3).canCommit();
162 doReturn(Futures.immediateFuture(null)).when(mockCohort3).abort();
164 CheckedFuture<Void, TransactionCommitFailedException> future = coordinator.submit(
165 transaction, Arrays.asList(mockCohort1, mockCohort2, mockCohort3));
167 assertFailure(future, null, mockCohort1, mockCohort2, mockCohort3);
170 private void assertFailure(final CheckedFuture<Void, TransactionCommitFailedException> future,
171 final Exception expCause, final DOMStoreThreePhaseCommitCohort... mockCohorts)
174 future.checkedGet(5, TimeUnit.SECONDS);
175 fail("Expected TransactionCommitFailedException");
176 } catch (TransactionCommitFailedException e) {
177 if(expCause != null) {
178 assertSame("Expected cause", expCause, e.getCause());
181 InOrder inOrder = inOrder((Object[])mockCohorts);
182 for(DOMStoreThreePhaseCommitCohort c: mockCohorts) {
183 inOrder.verify(c).abort();
185 } catch (TimeoutException e) {
191 public void testSubmitWithCanCommitException() throws Exception {
192 doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit();
193 doReturn(Futures.immediateFuture(null)).when(mockCohort1).abort();
195 IllegalStateException cause = new IllegalStateException("mock");
196 doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).canCommit();
197 doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort();
199 CheckedFuture<Void, TransactionCommitFailedException> future = coordinator.submit(
200 transaction, Arrays.asList(mockCohort1, mockCohort2));
202 assertFailure(future, cause, mockCohort1, mockCohort2);
206 public void testSubmitWithPreCommitException() throws Exception {
207 doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit();
208 doReturn(Futures.immediateFuture(null)).when(mockCohort1).preCommit();
209 doReturn(Futures.immediateFuture(null)).when(mockCohort1).abort();
211 doReturn(Futures.immediateFuture(true)).when(mockCohort2).canCommit();
212 IllegalStateException cause = new IllegalStateException("mock");
213 doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).preCommit();
214 doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort();
216 DOMStoreThreePhaseCommitCohort mockCohort3 = mock(DOMStoreThreePhaseCommitCohort.class);
217 doReturn(Futures.immediateFuture(true)).when(mockCohort3).canCommit();
218 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock2"))).
219 when(mockCohort3).preCommit();
220 doReturn(Futures.immediateFuture(null)).when(mockCohort3).abort();
222 CheckedFuture<Void, TransactionCommitFailedException> future = coordinator.submit(
223 transaction, Arrays.asList(mockCohort1, mockCohort2, mockCohort3));
225 assertFailure(future, cause, mockCohort1, mockCohort2, mockCohort3);
229 public void testSubmitWithCommitException() throws Exception {
230 doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit();
231 doReturn(Futures.immediateFuture(null)).when(mockCohort1).preCommit();
232 doReturn(Futures.immediateFuture(null)).when(mockCohort1).commit();
233 doReturn(Futures.immediateFuture(null)).when(mockCohort1).abort();
235 doReturn(Futures.immediateFuture(true)).when(mockCohort2).canCommit();
236 doReturn(Futures.immediateFuture(null)).when(mockCohort2).preCommit();
237 IllegalStateException cause = new IllegalStateException("mock");
238 doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).commit();
239 doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort();
241 DOMStoreThreePhaseCommitCohort mockCohort3 = mock(DOMStoreThreePhaseCommitCohort.class);
242 doReturn(Futures.immediateFuture(true)).when(mockCohort3).canCommit();
243 doReturn(Futures.immediateFuture(null)).when(mockCohort3).preCommit();
244 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock2"))).
245 when(mockCohort3).commit();
246 doReturn(Futures.immediateFuture(null)).when(mockCohort3).abort();
248 CheckedFuture<Void, TransactionCommitFailedException> future = coordinator.submit(
249 transaction, Arrays.asList(mockCohort1, mockCohort2, mockCohort3));
251 assertFailure(future, cause, mockCohort1, mockCohort2, mockCohort3);
255 public void testSubmitWithAbortException() throws Exception {
256 doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit();
257 doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock abort error"))).
258 when(mockCohort1).abort();
260 IllegalStateException cause = new IllegalStateException("mock canCommit error");
261 doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).canCommit();
262 doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort();
264 CheckedFuture<Void, TransactionCommitFailedException> future = coordinator.submit(
265 transaction, Arrays.asList(mockCohort1, mockCohort2));
267 assertFailure(future, cause, mockCohort1, mockCohort2);