Merge "Avoid IllegalArgument on missing source"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ConcurrentDOMDataBrokerTest.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertSame;
12 import static org.junit.Assert.fail;
13 import static org.mockito.Mockito.doAnswer;
14 import static org.mockito.Mockito.doReturn;
15 import static org.mockito.Mockito.inOrder;
16 import static org.mockito.Mockito.mock;
17 import com.google.common.collect.ImmutableMap;
18 import com.google.common.util.concurrent.CheckedFuture;
19 import com.google.common.util.concurrent.FutureCallback;
20 import com.google.common.util.concurrent.Futures;
21 import com.google.common.util.concurrent.ListenableFuture;
22 import com.google.common.util.concurrent.MoreExecutors;
23 import com.google.common.util.concurrent.SettableFuture;
24 import com.google.common.util.concurrent.Uninterruptibles;
25 import java.util.Arrays;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.SynchronousQueue;
28 import java.util.concurrent.ThreadPoolExecutor;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.TimeoutException;
31 import java.util.concurrent.atomic.AtomicReference;
32 import org.junit.After;
33 import org.junit.Before;
34 import org.junit.Test;
35 import org.mockito.InOrder;
36 import org.mockito.invocation.InvocationOnMock;
37 import org.mockito.stubbing.Answer;
38 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
39 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
40 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
41 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
42 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
43 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
44
45 /**
46  * Unit tests for DOMConcurrentDataCommitCoordinator.
47  *
48  * @author Thomas Pantelis
49  */
50 public class ConcurrentDOMDataBrokerTest {
51
52     private final DOMDataWriteTransaction transaction = mock(DOMDataWriteTransaction.class);
53     private final DOMStoreThreePhaseCommitCohort mockCohort1 = mock(DOMStoreThreePhaseCommitCohort.class);
54     private final DOMStoreThreePhaseCommitCohort mockCohort2 = mock(DOMStoreThreePhaseCommitCohort.class);
55     private final ThreadPoolExecutor futureExecutor =
56             new ThreadPoolExecutor(0, 1, 5, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
57     private ConcurrentDOMDataBroker coordinator;
58
59     @Before
60     public void setup() {
61         doReturn("tx").when(transaction).getIdentifier();
62
63         DOMStore store = new InMemoryDOMDataStore("OPER",
64             MoreExecutors.sameThreadExecutor());
65
66         coordinator = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL, store), futureExecutor);
67     }
68
69     @After
70     public void tearDown() {
71         futureExecutor.shutdownNow();
72     }
73
74     @Test
75     public void testSuccessfulSubmitAsync() throws Throwable {
76         testSuccessfulSubmit(true);
77     }
78
79     @Test
80     public void testSuccessfulSubmitSync() throws Throwable {
81         testSuccessfulSubmit(false);
82     }
83
84     private void testSuccessfulSubmit(final boolean doAsync) throws Throwable {
85         final CountDownLatch asyncCanCommitContinue = new CountDownLatch(1);
86         Answer<ListenableFuture<Boolean>> asyncCanCommit = new Answer<ListenableFuture<Boolean>>() {
87             @Override
88             public ListenableFuture<Boolean> answer(final InvocationOnMock invocation) {
89                 final SettableFuture<Boolean> future = SettableFuture.create();
90                 if(doAsync) {
91                     new Thread() {
92                         @Override
93                         public void run() {
94                             Uninterruptibles.awaitUninterruptibly(asyncCanCommitContinue,
95                                     10, TimeUnit.SECONDS);
96                             future.set(true);
97                         }
98                     }.start();
99                 } else {
100                     future.set(true);
101                 }
102
103                 return future;
104             }
105         };
106
107         doAnswer(asyncCanCommit).when(mockCohort1).canCommit();
108         doReturn(Futures.immediateFuture(null)).when(mockCohort1).preCommit();
109         doReturn(Futures.immediateFuture(null)).when(mockCohort1).commit();
110
111         doReturn(Futures.immediateFuture(true)).when(mockCohort2).canCommit();
112         doReturn(Futures.immediateFuture(null)).when(mockCohort2).preCommit();
113         doReturn(Futures.immediateFuture(null)).when(mockCohort2).commit();
114
115         CheckedFuture<Void, TransactionCommitFailedException> future = coordinator.submit(
116                 transaction, Arrays.asList(mockCohort1, mockCohort2));
117
118         final CountDownLatch doneLatch = new CountDownLatch(1);
119         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
120         Futures.addCallback(future, new FutureCallback<Void>() {
121             @Override
122             public void onSuccess(final Void result) {
123                 doneLatch.countDown();
124             }
125
126             @Override
127             public void onFailure(final Throwable t) {
128                 caughtEx.set(t);
129                 doneLatch.countDown();
130             }
131         });
132
133         asyncCanCommitContinue.countDown();
134
135         assertEquals("Submit complete", true, doneLatch.await(5, TimeUnit.SECONDS));
136
137         if(caughtEx.get() != null) {
138             throw caughtEx.get();
139         }
140
141         assertEquals("Task count", doAsync ? 1 : 0, futureExecutor.getTaskCount());
142
143         InOrder inOrder = inOrder(mockCohort1, mockCohort2);
144         inOrder.verify(mockCohort1).canCommit();
145         inOrder.verify(mockCohort2).canCommit();
146         inOrder.verify(mockCohort1).preCommit();
147         inOrder.verify(mockCohort2).preCommit();
148         inOrder.verify(mockCohort1).commit();
149         inOrder.verify(mockCohort2).commit();
150     }
151
152     @Test
153     public void testSubmitWithNegativeCanCommitResponse() throws Exception {
154         doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit();
155         doReturn(Futures.immediateFuture(null)).when(mockCohort1).abort();
156
157         doReturn(Futures.immediateFuture(false)).when(mockCohort2).canCommit();
158         doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort();
159
160         DOMStoreThreePhaseCommitCohort mockCohort3 = mock(DOMStoreThreePhaseCommitCohort.class);
161         doReturn(Futures.immediateFuture(false)).when(mockCohort3).canCommit();
162         doReturn(Futures.immediateFuture(null)).when(mockCohort3).abort();
163
164         CheckedFuture<Void, TransactionCommitFailedException> future = coordinator.submit(
165                 transaction, Arrays.asList(mockCohort1, mockCohort2, mockCohort3));
166
167         assertFailure(future, null, mockCohort1, mockCohort2, mockCohort3);
168     }
169
170     private void assertFailure(final CheckedFuture<Void, TransactionCommitFailedException> future,
171             final Exception expCause, final DOMStoreThreePhaseCommitCohort... mockCohorts)
172                     throws Exception {
173         try {
174             future.checkedGet(5, TimeUnit.SECONDS);
175             fail("Expected TransactionCommitFailedException");
176         } catch (TransactionCommitFailedException e) {
177             if(expCause != null) {
178                 assertSame("Expected cause", expCause, e.getCause());
179             }
180
181             InOrder inOrder = inOrder((Object[])mockCohorts);
182             for(DOMStoreThreePhaseCommitCohort c: mockCohorts) {
183                 inOrder.verify(c).abort();
184             }
185         } catch (TimeoutException e) {
186             throw e;
187         }
188     }
189
190     @Test
191     public void testSubmitWithCanCommitException() throws Exception {
192         doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit();
193         doReturn(Futures.immediateFuture(null)).when(mockCohort1).abort();
194
195         IllegalStateException cause = new IllegalStateException("mock");
196         doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).canCommit();
197         doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort();
198
199         CheckedFuture<Void, TransactionCommitFailedException> future = coordinator.submit(
200                 transaction, Arrays.asList(mockCohort1, mockCohort2));
201
202         assertFailure(future, cause, mockCohort1, mockCohort2);
203     }
204
205     @Test
206     public void testSubmitWithPreCommitException() throws Exception {
207         doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit();
208         doReturn(Futures.immediateFuture(null)).when(mockCohort1).preCommit();
209         doReturn(Futures.immediateFuture(null)).when(mockCohort1).abort();
210
211         doReturn(Futures.immediateFuture(true)).when(mockCohort2).canCommit();
212         IllegalStateException cause = new IllegalStateException("mock");
213         doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).preCommit();
214         doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort();
215
216         DOMStoreThreePhaseCommitCohort mockCohort3 = mock(DOMStoreThreePhaseCommitCohort.class);
217         doReturn(Futures.immediateFuture(true)).when(mockCohort3).canCommit();
218         doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock2"))).
219                 when(mockCohort3).preCommit();
220         doReturn(Futures.immediateFuture(null)).when(mockCohort3).abort();
221
222         CheckedFuture<Void, TransactionCommitFailedException> future = coordinator.submit(
223                 transaction, Arrays.asList(mockCohort1, mockCohort2, mockCohort3));
224
225         assertFailure(future, cause, mockCohort1, mockCohort2, mockCohort3);
226     }
227
228     @Test
229     public void testSubmitWithCommitException() throws Exception {
230         doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit();
231         doReturn(Futures.immediateFuture(null)).when(mockCohort1).preCommit();
232         doReturn(Futures.immediateFuture(null)).when(mockCohort1).commit();
233         doReturn(Futures.immediateFuture(null)).when(mockCohort1).abort();
234
235         doReturn(Futures.immediateFuture(true)).when(mockCohort2).canCommit();
236         doReturn(Futures.immediateFuture(null)).when(mockCohort2).preCommit();
237         IllegalStateException cause = new IllegalStateException("mock");
238         doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).commit();
239         doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort();
240
241         DOMStoreThreePhaseCommitCohort mockCohort3 = mock(DOMStoreThreePhaseCommitCohort.class);
242         doReturn(Futures.immediateFuture(true)).when(mockCohort3).canCommit();
243         doReturn(Futures.immediateFuture(null)).when(mockCohort3).preCommit();
244         doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock2"))).
245                 when(mockCohort3).commit();
246         doReturn(Futures.immediateFuture(null)).when(mockCohort3).abort();
247
248         CheckedFuture<Void, TransactionCommitFailedException> future = coordinator.submit(
249                 transaction, Arrays.asList(mockCohort1, mockCohort2, mockCohort3));
250
251         assertFailure(future, cause, mockCohort1, mockCohort2, mockCohort3);
252     }
253
254     @Test
255     public void testSubmitWithAbortException() throws Exception {
256         doReturn(Futures.immediateFuture(true)).when(mockCohort1).canCommit();
257         doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock abort error"))).
258                 when(mockCohort1).abort();
259
260         IllegalStateException cause = new IllegalStateException("mock canCommit error");
261         doReturn(Futures.immediateFailedFuture(cause)).when(mockCohort2).canCommit();
262         doReturn(Futures.immediateFuture(null)).when(mockCohort2).abort();
263
264         CheckedFuture<Void, TransactionCommitFailedException> future = coordinator.submit(
265                 transaction, Arrays.asList(mockCohort1, mockCohort2));
266
267         assertFailure(future, cause, mockCohort1, mockCohort2);
268     }
269 }