Merge "Bug 1686 - Rework TopologyManager to use transaction chaining in order to...
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxyTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import akka.actor.ActorPath;
4 import akka.actor.ActorSelection;
5 import akka.actor.Props;
6 import akka.dispatch.Futures;
7
8 import com.google.common.collect.Lists;
9 import com.google.common.util.concurrent.ListenableFuture;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.fail;
13 import static org.mockito.Mockito.doReturn;
14 import static org.mockito.Mockito.any;
15 import static org.mockito.Mockito.isA;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.verify;
18 import static org.mockito.Mockito.times;
19
20 import org.junit.Before;
21 import org.junit.Test;
22 import org.mockito.Mock;
23 import org.mockito.MockitoAnnotations;
24 import org.mockito.stubbing.Stubber;
25 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
31 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
33 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
34 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
35 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
36
37 import scala.concurrent.Future;
38
39 import java.util.List;
40 import java.util.concurrent.ExecutionException;
41 import java.util.concurrent.TimeUnit;
42
43 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
44
45     @SuppressWarnings("serial")
46     static class TestException extends RuntimeException {
47     }
48
49     @Mock
50     private ActorContext actorContext;
51
52     @Before
53     public void setUp() {
54         MockitoAnnotations.initMocks(this);
55
56         doReturn(getSystem()).when(actorContext).getActorSystem();
57     }
58
59     private Future<ActorPath> newCohortPath() {
60         ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path();
61         doReturn(mock(ActorSelection.class)).when(actorContext).actorSelection(path);
62         return Futures.successful(path);
63     }
64
65     private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception {
66         List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
67         for(int i = 1; i <= nCohorts; i++) {
68             cohortPathFutures.add(newCohortPath());
69         }
70
71         return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures, "txn-1");
72     }
73
74     private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath()
75             throws Exception {
76         List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
77         cohortPathFutures.add(newCohortPath());
78         cohortPathFutures.add(Futures.<ActorPath>failed(new TestException()));
79
80         return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures, "txn-1");
81     }
82
83     private void setupMockActorContext(Class<?> requestType, Object... responses) {
84         Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures
85                 .failed((Throwable) responses[0]) : Futures
86                 .successful(((SerializableMessage) responses[0]).toSerializable()));
87
88         for(int i = 1; i < responses.length; i++) {
89             stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures
90                     .failed((Throwable) responses[i]) : Futures
91                     .successful(((SerializableMessage) responses[i]).toSerializable()));
92         }
93
94         stubber.when(actorContext).executeRemoteOperationAsync(any(ActorSelection.class),
95                 isA(requestType));
96     }
97
98     private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
99         verify(actorContext, times(nCohorts)).executeRemoteOperationAsync(
100                 any(ActorSelection.class), isA(requestType));
101     }
102
103     private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
104
105         try {
106             future.get(5, TimeUnit.SECONDS);
107             fail("Expected ExecutionException");
108         } catch(ExecutionException e) {
109             throw e.getCause();
110         }
111     }
112
113     @Test
114     public void testCanCommitWithOneCohort() throws Exception {
115
116         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
117
118         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
119                 new CanCommitTransactionReply(true));
120
121         ListenableFuture<Boolean> future = proxy.canCommit();
122
123         assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
124
125         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
126                 new CanCommitTransactionReply(false));
127
128         future = proxy.canCommit();
129
130         assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
131
132         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
133     }
134
135     @Test
136     public void testCanCommitWithMultipleCohorts() throws Exception {
137
138         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
139
140         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
141                 new CanCommitTransactionReply(true), new CanCommitTransactionReply(true));
142
143         ListenableFuture<Boolean> future = proxy.canCommit();
144
145         assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
146
147         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
148     }
149
150     @Test
151     public void testCanCommitWithMultipleCohortsAndOneFailure() throws Exception {
152
153         ThreePhaseCommitCohortProxy proxy = setupProxy(3);
154
155         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
156                 new CanCommitTransactionReply(true), new CanCommitTransactionReply(false),
157                 new CanCommitTransactionReply(true));
158
159         ListenableFuture<Boolean> future = proxy.canCommit();
160
161         assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
162
163         verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS);
164     }
165
166     @Test(expected = TestException.class)
167     public void testCanCommitWithExceptionFailure() throws Throwable {
168
169         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
170
171         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new TestException());
172
173         propagateExecutionExceptionCause(proxy.canCommit());
174     }
175
176     @Test(expected = ExecutionException.class)
177     public void testCanCommitWithInvalidResponseType() throws Exception {
178
179         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
180
181         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
182                 new PreCommitTransactionReply());
183
184         proxy.canCommit().get(5, TimeUnit.SECONDS);
185     }
186
187     @Test(expected = TestException.class)
188     public void testCanCommitWithFailedCohortPath() throws Throwable {
189
190         ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
191
192         try {
193             propagateExecutionExceptionCause(proxy.canCommit());
194         } finally {
195             verifyCohortInvocations(0, CanCommitTransaction.SERIALIZABLE_CLASS);
196         }
197     }
198
199     @Test
200     public void testPreCommit() throws Exception {
201         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
202
203         setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
204                 new PreCommitTransactionReply());
205
206         proxy.preCommit().get(5, TimeUnit.SECONDS);
207
208         verifyCohortInvocations(1, PreCommitTransaction.SERIALIZABLE_CLASS);
209     }
210
211     @Test(expected = ExecutionException.class)
212     public void testPreCommitWithFailure() throws Exception {
213         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
214
215         setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
216                 new PreCommitTransactionReply(), new RuntimeException("mock"));
217
218         proxy.preCommit().get(5, TimeUnit.SECONDS);
219     }
220
221     @Test
222     public void testAbort() throws Exception {
223         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
224
225         setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply());
226
227         proxy.abort().get(5, TimeUnit.SECONDS);
228
229         verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
230     }
231
232     @Test
233     public void testAbortWithFailure() throws Exception {
234         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
235
236         setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
237
238         // The exception should not get propagated.
239         proxy.abort().get(5, TimeUnit.SECONDS);
240
241         verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
242     }
243
244     @Test
245     public void testAbortWithFailedCohortPath() throws Throwable {
246
247         ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
248
249         // The exception should not get propagated.
250         proxy.abort().get(5, TimeUnit.SECONDS);
251
252         verifyCohortInvocations(0, AbortTransaction.SERIALIZABLE_CLASS);
253     }
254
255     @Test
256     public void testCommit() throws Exception {
257
258         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
259
260         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
261                 new CommitTransactionReply());
262
263         proxy.commit().get(5, TimeUnit.SECONDS);
264
265         verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
266     }
267
268     @Test(expected = TestException.class)
269     public void testCommitWithFailure() throws Throwable {
270
271         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
272
273         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
274                 new TestException());
275
276         propagateExecutionExceptionCause(proxy.commit());
277     }
278
279     @Test(expected = ExecutionException.class)
280     public void testCommitWithInvalidResponseType() throws Exception {
281
282         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
283
284         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply());
285
286         proxy.commit().get(5, TimeUnit.SECONDS);
287     }
288
289     @Test(expected = TestException.class)
290     public void testCommitWithFailedCohortPath() throws Throwable {
291
292         ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
293
294         try {
295             propagateExecutionExceptionCause(proxy.commit());
296         } finally {
297             verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
298         }
299     }
300
301     @Test
302     public void testAllThreePhasesSuccessful() throws Exception {
303
304         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
305
306         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
307                 new CanCommitTransactionReply(true), new CanCommitTransactionReply(true));
308
309         setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
310                 new PreCommitTransactionReply(), new PreCommitTransactionReply());
311
312         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
313                 new CommitTransactionReply(), new CommitTransactionReply());
314
315         proxy.canCommit().get(5, TimeUnit.SECONDS);
316         proxy.preCommit().get(5, TimeUnit.SECONDS);
317         proxy.commit().get(5, TimeUnit.SECONDS);
318
319         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
320         verifyCohortInvocations(2, PreCommitTransaction.SERIALIZABLE_CLASS);
321         verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
322     }
323 }