Merge changes I3e404877,Ida2a5c32,I9e6ce426,I6a4b90f6,I79717533
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxyTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.fail;
5 import static org.mockito.Matchers.any;
6 import static org.mockito.Matchers.isA;
7 import static org.mockito.Mockito.doReturn;
8 import static org.mockito.Mockito.times;
9 import static org.mockito.Mockito.verify;
10 import akka.actor.ActorPath;
11 import akka.actor.ActorSelection;
12 import akka.actor.Props;
13 import akka.dispatch.Futures;
14 import com.google.common.collect.Lists;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import java.util.List;
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.TimeUnit;
19 import org.junit.Before;
20 import org.junit.Test;
21 import org.mockito.Mock;
22 import org.mockito.MockitoAnnotations;
23 import org.mockito.stubbing.Stubber;
24 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
26 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
31 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
32 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
33 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
34 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
35 import scala.concurrent.Future;
36
37 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
38
39     @SuppressWarnings("serial")
40     static class TestException extends RuntimeException {
41     }
42
43     @Mock
44     private ActorContext actorContext;
45
46     @Before
47     public void setUp() {
48         MockitoAnnotations.initMocks(this);
49
50         doReturn(getSystem()).when(actorContext).getActorSystem();
51     }
52
53     private Future<ActorSelection> newCohort() {
54         ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path();
55         ActorSelection actorSelection = getSystem().actorSelection(path);
56         return Futures.successful(actorSelection);
57     }
58
59     private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception {
60         List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
61         for(int i = 1; i <= nCohorts; i++) {
62             cohortFutures.add(newCohort());
63         }
64
65         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
66     }
67
68     private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath()
69             throws Exception {
70         List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
71         cohortFutures.add(newCohort());
72         cohortFutures.add(Futures.<ActorSelection>failed(new TestException()));
73
74         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
75     }
76
77     private void setupMockActorContext(Class<?> requestType, Object... responses) {
78         Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures
79                 .failed((Throwable) responses[0]) : Futures
80                 .successful(((SerializableMessage) responses[0]).toSerializable()));
81
82         for(int i = 1; i < responses.length; i++) {
83             stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures
84                     .failed((Throwable) responses[i]) : Futures
85                     .successful(((SerializableMessage) responses[i]).toSerializable()));
86         }
87
88         stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
89                 isA(requestType));
90     }
91
92     private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
93         verify(actorContext, times(nCohorts)).executeOperationAsync(
94                 any(ActorSelection.class), isA(requestType));
95     }
96
97     private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
98
99         try {
100             future.get(5, TimeUnit.SECONDS);
101             fail("Expected ExecutionException");
102         } catch(ExecutionException e) {
103             throw e.getCause();
104         }
105     }
106
107     @Test
108     public void testCanCommitWithOneCohort() throws Exception {
109
110         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
111
112         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
113                 CanCommitTransactionReply.YES);
114
115         ListenableFuture<Boolean> future = proxy.canCommit();
116
117         assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
118
119         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
120                 CanCommitTransactionReply.NO);
121
122         future = proxy.canCommit();
123
124         assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
125
126         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
127     }
128
129     @Test
130     public void testCanCommitWithMultipleCohorts() throws Exception {
131
132         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
133
134         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
135                 CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
136
137         ListenableFuture<Boolean> future = proxy.canCommit();
138
139         assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
140
141         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
142     }
143
144     @Test
145     public void testCanCommitWithMultipleCohortsAndOneFailure() throws Exception {
146
147         ThreePhaseCommitCohortProxy proxy = setupProxy(3);
148
149         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
150                 CanCommitTransactionReply.YES, CanCommitTransactionReply.NO, CanCommitTransactionReply.YES);
151
152         ListenableFuture<Boolean> future = proxy.canCommit();
153
154         assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
155
156         verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS);
157     }
158
159     @Test(expected = TestException.class)
160     public void testCanCommitWithExceptionFailure() throws Throwable {
161
162         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
163
164         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new TestException());
165
166         propagateExecutionExceptionCause(proxy.canCommit());
167     }
168
169     @Test(expected = ExecutionException.class)
170     public void testCanCommitWithInvalidResponseType() throws Exception {
171
172         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
173
174         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
175                 new PreCommitTransactionReply());
176
177         proxy.canCommit().get(5, TimeUnit.SECONDS);
178     }
179
180     @Test(expected = TestException.class)
181     public void testCanCommitWithFailedCohortPath() throws Throwable {
182
183         ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
184
185         try {
186             propagateExecutionExceptionCause(proxy.canCommit());
187         } finally {
188             verifyCohortInvocations(0, CanCommitTransaction.SERIALIZABLE_CLASS);
189         }
190     }
191
192     @Test
193     public void testPreCommit() throws Exception {
194         // Precommit is currently a no-op
195         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
196
197         setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
198                 new PreCommitTransactionReply());
199
200         proxy.preCommit().get(5, TimeUnit.SECONDS);
201     }
202
203     @Test
204     public void testAbort() throws Exception {
205         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
206
207         setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply());
208
209         proxy.abort().get(5, TimeUnit.SECONDS);
210
211         verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
212     }
213
214     @Test
215     public void testAbortWithFailure() throws Exception {
216         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
217
218         setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
219
220         // The exception should not get propagated.
221         proxy.abort().get(5, TimeUnit.SECONDS);
222
223         verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
224     }
225
226     @Test
227     public void testAbortWithFailedCohortPath() throws Throwable {
228
229         ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
230
231         // The exception should not get propagated.
232         proxy.abort().get(5, TimeUnit.SECONDS);
233
234         verifyCohortInvocations(0, AbortTransaction.SERIALIZABLE_CLASS);
235     }
236
237     @Test
238     public void testCommit() throws Exception {
239
240         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
241
242         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
243                 new CommitTransactionReply());
244
245         proxy.commit().get(5, TimeUnit.SECONDS);
246
247         verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
248     }
249
250     @Test(expected = TestException.class)
251     public void testCommitWithFailure() throws Throwable {
252
253         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
254
255         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
256                 new TestException());
257
258         propagateExecutionExceptionCause(proxy.commit());
259     }
260
261     @Test(expected = ExecutionException.class)
262     public void testCommitWithInvalidResponseType() throws Exception {
263
264         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
265
266         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply());
267
268         proxy.commit().get(5, TimeUnit.SECONDS);
269     }
270
271     @Test(expected = TestException.class)
272     public void testCommitWithFailedCohortPath() throws Throwable {
273
274         ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
275
276         try {
277             propagateExecutionExceptionCause(proxy.commit());
278         } finally {
279             verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
280         }
281     }
282
283     @Test
284     public void testAllThreePhasesSuccessful() throws Exception {
285
286         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
287
288         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
289                 CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
290
291         setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
292                 new PreCommitTransactionReply(), new PreCommitTransactionReply());
293
294         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
295                 new CommitTransactionReply(), new CommitTransactionReply());
296
297         proxy.canCommit().get(5, TimeUnit.SECONDS);
298         proxy.preCommit().get(5, TimeUnit.SECONDS);
299         proxy.commit().get(5, TimeUnit.SECONDS);
300
301         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
302         verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
303     }
304 }