Merge "Adding sal-compatibilty feature"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxyTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import akka.actor.ActorPath;
4 import akka.actor.ActorSelection;
5 import akka.actor.Props;
6 import akka.dispatch.Futures;
7
8 import com.google.common.collect.Lists;
9 import com.google.common.util.concurrent.ListenableFuture;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.fail;
13 import static org.mockito.Mockito.doReturn;
14 import static org.mockito.Mockito.any;
15 import static org.mockito.Mockito.isA;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.verify;
18 import static org.mockito.Mockito.times;
19
20 import org.junit.Before;
21 import org.junit.Test;
22 import org.mockito.Mock;
23 import org.mockito.MockitoAnnotations;
24 import org.mockito.stubbing.Stubber;
25 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
31 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
33 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
34 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
35 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
36
37 import scala.concurrent.Future;
38 import scala.concurrent.duration.FiniteDuration;
39
40 import java.util.List;
41 import java.util.concurrent.ExecutionException;
42 import java.util.concurrent.TimeUnit;
43
44 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
45
46     @SuppressWarnings("serial")
47     static class TestException extends RuntimeException {
48     }
49
50     @Mock
51     private ActorContext actorContext;
52
53     @Before
54     public void setUp() {
55         MockitoAnnotations.initMocks(this);
56
57         doReturn(getSystem()).when(actorContext).getActorSystem();
58     }
59
60     private Future<ActorPath> newCohortPath() {
61         ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path();
62         doReturn(mock(ActorSelection.class)).when(actorContext).actorSelection(path);
63         return Futures.successful(path);
64     }
65
66     private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception {
67         List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
68         for(int i = 1; i <= nCohorts; i++) {
69             cohortPathFutures.add(newCohortPath());
70         }
71
72         return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures, "txn-1");
73     }
74
75     private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath()
76             throws Exception {
77         List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
78         cohortPathFutures.add(newCohortPath());
79         cohortPathFutures.add(Futures.<ActorPath>failed(new TestException()));
80
81         return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures, "txn-1");
82     }
83
84     private void setupMockActorContext(Class<?> requestType, Object... responses) {
85         Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures
86                 .failed((Throwable) responses[0]) : Futures
87                 .successful(((SerializableMessage) responses[0]).toSerializable()));
88
89         for(int i = 1; i < responses.length; i++) {
90             stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures
91                     .failed((Throwable) responses[i]) : Futures
92                     .successful(((SerializableMessage) responses[i]).toSerializable()));
93         }
94
95         stubber.when(actorContext).executeRemoteOperationAsync(any(ActorSelection.class),
96                 isA(requestType), any(FiniteDuration.class));
97     }
98
99     private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
100         verify(actorContext, times(nCohorts)).executeRemoteOperationAsync(
101                 any(ActorSelection.class), isA(requestType), any(FiniteDuration.class));
102     }
103
104     private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
105
106         try {
107             future.get(5, TimeUnit.SECONDS);
108             fail("Expected ExecutionException");
109         } catch(ExecutionException e) {
110             throw e.getCause();
111         }
112     }
113
114     @Test
115     public void testCanCommitWithOneCohort() throws Exception {
116
117         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
118
119         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
120                 new CanCommitTransactionReply(true));
121
122         ListenableFuture<Boolean> future = proxy.canCommit();
123
124         assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
125
126         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
127                 new CanCommitTransactionReply(false));
128
129         future = proxy.canCommit();
130
131         assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
132
133         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
134     }
135
136     @Test
137     public void testCanCommitWithMultipleCohorts() throws Exception {
138
139         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
140
141         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
142                 new CanCommitTransactionReply(true), new CanCommitTransactionReply(true));
143
144         ListenableFuture<Boolean> future = proxy.canCommit();
145
146         assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
147
148         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
149     }
150
151     @Test
152     public void testCanCommitWithMultipleCohortsAndOneFailure() throws Exception {
153
154         ThreePhaseCommitCohortProxy proxy = setupProxy(3);
155
156         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
157                 new CanCommitTransactionReply(true), new CanCommitTransactionReply(false),
158                 new CanCommitTransactionReply(true));
159
160         ListenableFuture<Boolean> future = proxy.canCommit();
161
162         assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
163
164         verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS);
165     }
166
167     @Test(expected = TestException.class)
168     public void testCanCommitWithExceptionFailure() throws Throwable {
169
170         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
171
172         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new TestException());
173
174         propagateExecutionExceptionCause(proxy.canCommit());
175     }
176
177     @Test(expected = ExecutionException.class)
178     public void testCanCommitWithInvalidResponseType() throws Exception {
179
180         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
181
182         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
183                 new PreCommitTransactionReply());
184
185         proxy.canCommit().get(5, TimeUnit.SECONDS);
186     }
187
188     @Test(expected = TestException.class)
189     public void testCanCommitWithFailedCohortPath() throws Throwable {
190
191         ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
192
193         try {
194             propagateExecutionExceptionCause(proxy.canCommit());
195         } finally {
196             verifyCohortInvocations(0, CanCommitTransaction.SERIALIZABLE_CLASS);
197         }
198     }
199
200     @Test
201     public void testPreCommit() throws Exception {
202         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
203
204         setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
205                 new PreCommitTransactionReply());
206
207         proxy.preCommit().get(5, TimeUnit.SECONDS);
208
209         verifyCohortInvocations(1, PreCommitTransaction.SERIALIZABLE_CLASS);
210     }
211
212     @Test(expected = ExecutionException.class)
213     public void testPreCommitWithFailure() throws Exception {
214         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
215
216         setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
217                 new PreCommitTransactionReply(), new RuntimeException("mock"));
218
219         proxy.preCommit().get(5, TimeUnit.SECONDS);
220     }
221
222     @Test
223     public void testAbort() throws Exception {
224         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
225
226         setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply());
227
228         proxy.abort().get(5, TimeUnit.SECONDS);
229
230         verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
231     }
232
233     @Test
234     public void testAbortWithFailure() throws Exception {
235         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
236
237         setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
238
239         // The exception should not get propagated.
240         proxy.abort().get(5, TimeUnit.SECONDS);
241
242         verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
243     }
244
245     @Test
246     public void testAbortWithFailedCohortPath() throws Throwable {
247
248         ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
249
250         // The exception should not get propagated.
251         proxy.abort().get(5, TimeUnit.SECONDS);
252
253         verifyCohortInvocations(0, AbortTransaction.SERIALIZABLE_CLASS);
254     }
255
256     @Test
257     public void testCommit() throws Exception {
258
259         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
260
261         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
262                 new CommitTransactionReply());
263
264         proxy.commit().get(5, TimeUnit.SECONDS);
265
266         verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
267     }
268
269     @Test(expected = TestException.class)
270     public void testCommitWithFailure() throws Throwable {
271
272         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
273
274         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
275                 new TestException());
276
277         propagateExecutionExceptionCause(proxy.commit());
278     }
279
280     @Test(expected = ExecutionException.class)
281     public void testCommitWithInvalidResponseType() throws Exception {
282
283         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
284
285         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply());
286
287         proxy.commit().get(5, TimeUnit.SECONDS);
288     }
289
290     @Test(expected = TestException.class)
291     public void testCommitWithFailedCohortPath() throws Throwable {
292
293         ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
294
295         try {
296             propagateExecutionExceptionCause(proxy.commit());
297         } finally {
298             verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
299         }
300     }
301
302     @Test
303     public void testAllThreePhasesSuccessful() throws Exception {
304
305         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
306
307         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
308                 new CanCommitTransactionReply(true), new CanCommitTransactionReply(true));
309
310         setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
311                 new PreCommitTransactionReply(), new PreCommitTransactionReply());
312
313         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
314                 new CommitTransactionReply(), new CommitTransactionReply());
315
316         proxy.canCommit().get(5, TimeUnit.SECONDS);
317         proxy.preCommit().get(5, TimeUnit.SECONDS);
318         proxy.commit().get(5, TimeUnit.SECONDS);
319
320         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
321         verifyCohortInvocations(2, PreCommitTransaction.SERIALIZABLE_CLASS);
322         verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
323     }
324 }