cb3bd60fbf15fb3a4a6018cc9b2a8f4bcca4546b
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxyTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.fail;
5 import static org.mockito.Matchers.any;
6 import static org.mockito.Matchers.isA;
7 import static org.mockito.Mockito.doReturn;
8 import static org.mockito.Mockito.times;
9 import static org.mockito.Mockito.verify;
10 import akka.actor.ActorPath;
11 import akka.actor.ActorSelection;
12 import akka.actor.Props;
13 import akka.dispatch.Futures;
14 import akka.util.Timeout;
15 import com.codahale.metrics.Snapshot;
16 import com.codahale.metrics.Timer;
17 import com.google.common.collect.Lists;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import java.util.List;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.Before;
23 import org.junit.Test;
24 import org.mockito.Mock;
25 import org.mockito.MockitoAnnotations;
26 import org.mockito.stubbing.Stubber;
27 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
31 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
33 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
34 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
35 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
36 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
37 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
38 import scala.concurrent.Future;
39 import scala.concurrent.duration.Duration;
40
41 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
42
43     @SuppressWarnings("serial")
44     static class TestException extends RuntimeException {
45     }
46
47     @Mock
48     private ActorContext actorContext;
49
50     @Mock
51     private DatastoreContext datastoreContext;
52
53     @Mock
54     private Timer commitTimer;
55
56     @Mock
57     private Timer.Context commitTimerContext;
58
59     @Mock
60     private Snapshot commitSnapshot;
61
62     @Before
63     public void setUp() {
64         MockitoAnnotations.initMocks(this);
65
66         doReturn(getSystem()).when(actorContext).getActorSystem();
67         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
68         doReturn(datastoreContext).when(actorContext).getDatastoreContext();
69         doReturn(30).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
70         doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
71         doReturn(commitTimerContext).when(commitTimer).time();
72         doReturn(commitSnapshot).when(commitTimer).getSnapshot();
73         for(int i=1;i<11;i++){
74             // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
75             // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
76             doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
77         }
78         doReturn(10.0).when(actorContext).getTxCreationLimit();
79     }
80
81     private Future<ActorSelection> newCohort() {
82         ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path();
83         ActorSelection actorSelection = getSystem().actorSelection(path);
84         return Futures.successful(actorSelection);
85     }
86
87     private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception {
88         List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
89         for(int i = 1; i <= nCohorts; i++) {
90             cohortFutures.add(newCohort());
91         }
92
93         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
94     }
95
96     private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath()
97             throws Exception {
98         List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
99         cohortFutures.add(newCohort());
100         cohortFutures.add(Futures.<ActorSelection>failed(new TestException()));
101
102         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
103     }
104
105     private void setupMockActorContext(Class<?> requestType, Object... responses) {
106         Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures
107                 .failed((Throwable) responses[0]) : Futures
108                 .successful(((SerializableMessage) responses[0]).toSerializable()));
109
110         for(int i = 1; i < responses.length; i++) {
111             stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures
112                     .failed((Throwable) responses[i]) : Futures
113                     .successful(((SerializableMessage) responses[i]).toSerializable()));
114         }
115
116         stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
117                 isA(requestType), any(Timeout.class));
118
119         doReturn(new Timeout(Duration.apply(1000, TimeUnit.MILLISECONDS)))
120                 .when(actorContext).getTransactionCommitOperationTimeout();
121     }
122
123     private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
124         verify(actorContext, times(nCohorts)).executeOperationAsync(
125                 any(ActorSelection.class), isA(requestType), any(Timeout.class));
126     }
127
128     private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
129
130         try {
131             future.get(5, TimeUnit.SECONDS);
132             fail("Expected ExecutionException");
133         } catch(ExecutionException e) {
134             throw e.getCause();
135         }
136     }
137
138     @Test
139     public void testCanCommitWithOneCohort() throws Exception {
140
141         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
142
143         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
144                 CanCommitTransactionReply.YES);
145
146         ListenableFuture<Boolean> future = proxy.canCommit();
147
148         assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
149
150         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
151                 CanCommitTransactionReply.NO);
152
153         future = proxy.canCommit();
154
155         assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
156
157         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
158     }
159
160     @Test
161     public void testCanCommitWithMultipleCohorts() throws Exception {
162
163         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
164
165         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
166                 CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
167
168         ListenableFuture<Boolean> future = proxy.canCommit();
169
170         assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
171
172         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
173     }
174
175     @Test
176     public void testCanCommitWithMultipleCohortsAndOneFailure() throws Exception {
177
178         ThreePhaseCommitCohortProxy proxy = setupProxy(3);
179
180         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
181                 CanCommitTransactionReply.YES, CanCommitTransactionReply.NO, CanCommitTransactionReply.YES);
182
183         ListenableFuture<Boolean> future = proxy.canCommit();
184
185         Boolean actual = future.get(5, TimeUnit.SECONDS);
186
187         assertEquals("canCommit", false, actual);
188
189         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
190     }
191
192     @Test(expected = TestException.class)
193     public void testCanCommitWithExceptionFailure() throws Throwable {
194
195         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
196
197         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new TestException());
198
199         propagateExecutionExceptionCause(proxy.canCommit());
200     }
201
202     @Test(expected = ExecutionException.class)
203     public void testCanCommitWithInvalidResponseType() throws Exception {
204
205         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
206
207         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
208                 new PreCommitTransactionReply());
209
210         proxy.canCommit().get(5, TimeUnit.SECONDS);
211     }
212
213     @Test(expected = TestException.class)
214     public void testCanCommitWithFailedCohortPath() throws Throwable {
215
216         ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
217
218         try {
219             propagateExecutionExceptionCause(proxy.canCommit());
220         } finally {
221             verifyCohortInvocations(0, CanCommitTransaction.SERIALIZABLE_CLASS);
222         }
223     }
224
225     @Test
226     public void testPreCommit() throws Exception {
227         // Precommit is currently a no-op
228         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
229
230         setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
231                 new PreCommitTransactionReply());
232
233         proxy.preCommit().get(5, TimeUnit.SECONDS);
234     }
235
236     @Test
237     public void testAbort() throws Exception {
238         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
239
240         setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply());
241
242         proxy.abort().get(5, TimeUnit.SECONDS);
243
244         verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
245     }
246
247     @Test
248     public void testAbortWithFailure() throws Exception {
249         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
250
251         setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
252
253         // The exception should not get propagated.
254         proxy.abort().get(5, TimeUnit.SECONDS);
255
256         verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
257     }
258
259     @Test
260     public void testAbortWithFailedCohortPath() throws Throwable {
261
262         ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
263
264         // The exception should not get propagated.
265         proxy.abort().get(5, TimeUnit.SECONDS);
266
267         verifyCohortInvocations(0, AbortTransaction.SERIALIZABLE_CLASS);
268     }
269
270     @Test
271     public void testCommit() throws Exception {
272
273         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
274
275         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
276                 new CommitTransactionReply());
277
278         proxy.commit().get(5, TimeUnit.SECONDS);
279
280         verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
281     }
282
283     @Test(expected = TestException.class)
284     public void testCommitWithFailure() throws Throwable {
285
286         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
287
288         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
289                 new TestException());
290
291         propagateExecutionExceptionCause(proxy.commit());
292     }
293
294     @Test(expected = ExecutionException.class)
295     public void testCommitWithInvalidResponseType() throws Exception {
296
297         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
298
299         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply());
300
301         proxy.commit().get(5, TimeUnit.SECONDS);
302     }
303
304     @Test(expected = TestException.class)
305     public void testCommitWithFailedCohortPath() throws Throwable {
306
307         ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
308
309         try {
310             propagateExecutionExceptionCause(proxy.commit());
311         } finally {
312
313             verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
314         }
315
316     }
317
318     @Test
319     public void testAllThreePhasesSuccessful() throws Exception {
320
321         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
322
323         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
324                 CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
325
326         setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
327                 new PreCommitTransactionReply(), new PreCommitTransactionReply());
328
329         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
330                 new CommitTransactionReply(), new CommitTransactionReply());
331
332         assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
333
334         proxy.canCommit().get(5, TimeUnit.SECONDS);
335         proxy.preCommit().get(5, TimeUnit.SECONDS);
336         proxy.commit().get(5, TimeUnit.SECONDS);
337
338         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
339         verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
340
341     }
342
343     @Test
344     public void testDoNotChangeTxCreationLimitWhenCommittingEmptyTxn() throws Exception {
345
346         ThreePhaseCommitCohortProxy proxy = setupProxy(0);
347
348         assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
349
350         proxy.canCommit().get(5, TimeUnit.SECONDS);
351         proxy.preCommit().get(5, TimeUnit.SECONDS);
352         proxy.commit().get(5, TimeUnit.SECONDS);
353
354     }
355 }