Fix intermittent unit test failures
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxyTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.fail;
14 import static org.mockito.Mockito.doReturn;
15 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
16 import akka.actor.ActorSelection;
17 import akka.actor.Props;
18 import akka.actor.UntypedActor;
19 import akka.dispatch.Dispatchers;
20 import akka.dispatch.Futures;
21 import akka.testkit.TestActorRef;
22 import com.codahale.metrics.Snapshot;
23 import com.codahale.metrics.Timer;
24 import com.google.common.base.Supplier;
25 import com.google.common.util.concurrent.ListenableFuture;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.Collections;
29 import java.util.List;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import org.junit.Before;
34 import org.junit.Test;
35 import org.mockito.Mock;
36 import org.mockito.MockitoAnnotations;
37 import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy.CohortInfo;
38 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
39 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
40 import org.opendaylight.controller.cluster.datastore.messages.AbstractThreePhaseCommitMessage;
41 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
42 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
43 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
44 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
45 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
46 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
47 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
48 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
49 import org.opendaylight.controller.cluster.raft.TestActorFactory;
50 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
51 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
52
53 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
54
55     @SuppressWarnings("serial")
56     static class TestException extends RuntimeException {
57     }
58
59     private ActorContext actorContext;
60
61     @Mock
62     private Timer commitTimer;
63
64     @Mock
65     private Timer.Context commitTimerContext;
66
67     @Mock
68     private Snapshot commitSnapshot;
69
70     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
71     private final List<TestActorRef<CohortActor>> cohortActors = new ArrayList<>();
72
73     @Before
74     public void setUp() {
75         MockitoAnnotations.initMocks(this);
76
77         actorContext = new ActorContext(getSystem(), actorFactory.createActor(Props.create(DoNothingActor.class)),
78                 new MockClusterWrapper(), new MockConfiguration(),
79                 DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache()) {
80                     @Override
81                     public Timer getOperationTimer(String operationName) {
82                         return commitTimer;
83                     }
84
85                     @Override
86                     public double getTxCreationLimit() {
87                         return 10.0;
88                     }
89                 };
90
91         doReturn(commitTimerContext).when(commitTimer).time();
92         doReturn(commitSnapshot).when(commitTimer).getSnapshot();
93         for(int i=1;i<11;i++){
94             // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
95             // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
96             doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
97         }
98     }
99
100     @Test
101     public void testCanCommitYesWithOneCohort() throws Exception {
102         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
103                 newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
104                         CanCommitTransactionReply.yes(CURRENT_VERSION)))), "txn-1");
105
106         verifyCanCommit(proxy.canCommit(), true);
107         verifyCohortActors();
108     }
109
110     @Test
111     public void testCanCommitNoWithOneCohort() throws Exception {
112         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
113                 newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
114                         CanCommitTransactionReply.no(CURRENT_VERSION)))), "txn-1");
115
116         verifyCanCommit(proxy.canCommit(), false);
117         verifyCohortActors();
118     }
119
120     @Test
121     public void testCanCommitYesWithTwoCohorts() throws Exception {
122         List<CohortInfo> cohorts = Arrays.asList(
123                 newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
124                         CanCommitTransactionReply.yes(CURRENT_VERSION))),
125                 newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
126                         CanCommitTransactionReply.yes(CURRENT_VERSION))));
127         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
128
129         verifyCanCommit(proxy.canCommit(), true);
130         verifyCohortActors();
131     }
132
133     @Test
134     public void testCanCommitNoWithThreeCohorts() throws Exception {
135         List<CohortInfo> cohorts = Arrays.asList(
136                 newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
137                         CanCommitTransactionReply.yes(CURRENT_VERSION))),
138                 newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
139                         CanCommitTransactionReply.no(CURRENT_VERSION))),
140                 newCohortInfo(new CohortActor.Builder("txn-1")));
141         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
142
143         verifyCanCommit(proxy.canCommit(), false);
144         verifyCohortActors();
145     }
146
147     @Test(expected = TestException.class)
148     public void testCanCommitWithExceptionFailure() throws Throwable {
149         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
150                 newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(new TestException()))), "txn-1");
151
152         propagateExecutionExceptionCause(proxy.canCommit());
153     }
154
155     @Test(expected = IllegalArgumentException.class)
156     public void testCanCommitWithInvalidResponseType() throws Throwable {
157         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
158                 newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit("invalid"))), "txn-1");
159
160         propagateExecutionExceptionCause(proxy.canCommit());
161     }
162
163     @Test(expected = TestException.class)
164     public void testCanCommitWithFailedCohortFuture() throws Throwable {
165         List<CohortInfo> cohorts = Arrays.asList(
166                 newCohortInfo(new CohortActor.Builder("txn-1")),
167                 newCohortInfoWithFailedFuture(new TestException()),
168                 newCohortInfo(new CohortActor.Builder("txn-1")));
169         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
170
171         propagateExecutionExceptionCause(proxy.canCommit());
172     }
173
174     @Test
175     public void testAllThreePhasesSuccessful() throws Exception {
176         List<CohortInfo> cohorts = Arrays.asList(
177                 newCohortInfo(new CohortActor.Builder("txn-1").
178                         expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
179                         expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
180                 newCohortInfo(new CohortActor.Builder("txn-1").
181                         expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
182                         expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))));
183         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
184
185         verifyCanCommit(proxy.canCommit(), true);
186         verifySuccessfulFuture(proxy.preCommit());
187         verifySuccessfulFuture(proxy.commit());
188         verifyCohortActors();
189     }
190
191     @Test(expected = TestException.class)
192     public void testCommitWithExceptionFailure() throws Throwable {
193         List<CohortInfo> cohorts = Arrays.asList(
194                 newCohortInfo(new CohortActor.Builder("txn-1").
195                         expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
196                         expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
197                 newCohortInfo(new CohortActor.Builder("txn-1").
198                         expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
199                         expectCommit(new TestException())));
200         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
201
202         verifyCanCommit(proxy.canCommit(), true);
203         verifySuccessfulFuture(proxy.preCommit());
204         propagateExecutionExceptionCause(proxy.commit());
205     }
206
207     @Test(expected = IllegalArgumentException.class)
208     public void testCommitWithInvalidResponseType() throws Throwable {
209         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
210                 newCohortInfo(new CohortActor.Builder("txn-1").
211                         expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
212                         expectCommit("invalid"))), "txn-1");
213
214         verifyCanCommit(proxy.canCommit(), true);
215         verifySuccessfulFuture(proxy.preCommit());
216         propagateExecutionExceptionCause(proxy.commit());
217     }
218
219     @Test
220     public void testAbort() throws Exception {
221         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
222                 newCohortInfo(new CohortActor.Builder("txn-1").expectAbort(
223                         AbortTransactionReply.instance(CURRENT_VERSION)))), "txn-1");
224
225         verifySuccessfulFuture(proxy.abort());
226         verifyCohortActors();
227     }
228
229     @Test
230     public void testAbortWithFailure() throws Exception {
231         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
232                 newCohortInfo(new CohortActor.Builder("txn-1").expectAbort(new RuntimeException("mock")))), "txn-1");
233
234         // The exception should not get propagated.
235         verifySuccessfulFuture(proxy.abort());
236         verifyCohortActors();
237     }
238
239     @Test
240     public void testAbortWithFailedCohortFuture() throws Throwable {
241         List<CohortInfo> cohorts = Arrays.asList(
242                 newCohortInfoWithFailedFuture(new TestException()),
243                 newCohortInfo(new CohortActor.Builder("txn-1")));
244         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
245
246         verifySuccessfulFuture(proxy.abort());
247         verifyCohortActors();
248     }
249
250     @Test
251     public void testWithNoCohorts() throws Exception {
252         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext,
253                 Collections.<CohortInfo>emptyList(), "txn-1");
254
255         verifyCanCommit(proxy.canCommit(), true);
256         verifySuccessfulFuture(proxy.preCommit());
257         verifySuccessfulFuture(proxy.commit());
258         verifyCohortActors();
259     }
260
261     @Test
262     public void testBackwardsCompatibilityWithPreBoron() throws Exception {
263         List<CohortInfo> cohorts = Arrays.asList(
264                 newCohortInfo(new CohortActor.Builder("txn-1").
265                         expectCanCommit(ThreePhaseCommitCohortMessages.CanCommitTransaction.class,
266                                 CanCommitTransactionReply.yes(DataStoreVersions.LITHIUM_VERSION)).
267                         expectCommit(ThreePhaseCommitCohortMessages.CommitTransaction.class,
268                                 CommitTransactionReply.instance(DataStoreVersions.LITHIUM_VERSION)),
269                         DataStoreVersions.LITHIUM_VERSION));
270         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
271
272         verifyCanCommit(proxy.canCommit(), true);
273         verifySuccessfulFuture(proxy.preCommit());
274         verifySuccessfulFuture(proxy.commit());
275         verifyCohortActors();
276     }
277
278     private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
279
280         try {
281             future.get(5, TimeUnit.SECONDS);
282             fail("Expected ExecutionException");
283         } catch(ExecutionException e) {
284             verifyCohortActors();
285             throw e.getCause();
286         }
287     }
288
289     private CohortInfo newCohortInfo(CohortActor.Builder builder, final short version) {
290         TestActorRef<CohortActor> actor = actorFactory.createTestActor(builder.props().
291                 withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("cohort"));
292         cohortActors.add(actor);
293         return new CohortInfo(Futures.successful(getSystem().actorSelection(actor.path())), new Supplier<Short>() {
294             @Override
295             public Short get() {
296                 return version;
297             }
298         });
299     }
300
301     private static CohortInfo newCohortInfoWithFailedFuture(Exception failure) {
302         return new CohortInfo(Futures.<ActorSelection>failed(failure), new Supplier<Short>() {
303             @Override
304             public Short get() {
305                 return CURRENT_VERSION;
306             }
307         });
308     }
309
310     private CohortInfo newCohortInfo(CohortActor.Builder builder) {
311         return newCohortInfo(builder, CURRENT_VERSION);
312     }
313
314     private void verifyCohortActors() {
315         for(TestActorRef<CohortActor> actor: cohortActors) {
316             actor.underlyingActor().verify();
317         }
318     }
319
320     private <T> T verifySuccessfulFuture(ListenableFuture<T> future) throws Exception {
321         try {
322             return future.get(5, TimeUnit.SECONDS);
323         } catch(Exception e) {
324             verifyCohortActors();
325             throw e;
326         }
327     }
328
329     private void verifyCanCommit(ListenableFuture<Boolean> future, boolean expected) throws Exception {
330         Boolean actual = verifySuccessfulFuture(future);
331         assertEquals("canCommit", expected, actual);
332     }
333
334     private static class CohortActor extends UntypedActor {
335         private final Builder builder;
336         private final AtomicInteger canCommitCount = new AtomicInteger();
337         private final AtomicInteger commitCount = new AtomicInteger();
338         private final AtomicInteger abortCount = new AtomicInteger();
339         private volatile AssertionError assertionError;
340
341         private CohortActor(Builder builder) {
342             this.builder = builder;
343         }
344
345         @Override
346         public void onReceive(Object message) {
347             if(CanCommitTransaction.isSerializedType(message)) {
348                 canCommitCount.incrementAndGet();
349                 onMessage("CanCommitTransaction", message, CanCommitTransaction.fromSerializable(message),
350                         builder.expCanCommitType, builder.canCommitReply);
351             } else if(CommitTransaction.isSerializedType(message)) {
352                 commitCount.incrementAndGet();
353                 onMessage("CommitTransaction", message, CommitTransaction.fromSerializable(message),
354                         builder.expCommitType, builder.commitReply);
355             } else if(AbortTransaction.isSerializedType(message)) {
356                 abortCount.incrementAndGet();
357                 onMessage("AbortTransaction", message, AbortTransaction.fromSerializable(message),
358                         builder.expAbortType, builder.abortReply);
359             } else {
360                 assertionError = new AssertionError("Unexpected message " + message);
361             }
362         }
363
364         private void onMessage(String name, Object rawMessage, AbstractThreePhaseCommitMessage actualMessage,
365                 Class<?> expType, Object reply) {
366             try {
367                 assertNotNull("Unexpected " + name, expType);
368                 assertEquals(name + " type", expType, rawMessage.getClass());
369                 assertEquals(name + " transactionId", builder.transactionId, actualMessage.getTransactionID());
370
371                 if(reply instanceof Throwable) {
372                     getSender().tell(new akka.actor.Status.Failure((Throwable)reply), self());
373                 } else {
374                     getSender().tell(reply, self());
375                 }
376             } catch(AssertionError e) {
377                 assertionError = e;
378             }
379         }
380
381         void verify() {
382             if(assertionError != null) {
383                 throw assertionError;
384             }
385
386             if(builder.expCanCommitType != null) {
387                 assertEquals("CanCommitTransaction count", 1, canCommitCount.get());
388             }
389
390             if(builder.expCommitType != null) {
391                 assertEquals("CommitTransaction count", 1, commitCount.get());
392             }
393
394             if(builder.expAbortType != null) {
395                 assertEquals("AbortTransaction count", 1, abortCount.get());
396             }
397         }
398
399         static class Builder {
400             private Class<?> expCanCommitType;
401             private Class<?> expCommitType;
402             private Class<?> expAbortType;
403             private Object canCommitReply;
404             private Object commitReply;
405             private Object abortReply;
406             private final String transactionId;
407
408             Builder(String transactionId) {
409                 this.transactionId = transactionId;
410             }
411
412             Builder expectCanCommit(Class<?> expCanCommitType, Object canCommitReply) {
413                 this.expCanCommitType = expCanCommitType;
414                 this.canCommitReply = canCommitReply;
415                 return this;
416             }
417
418             Builder expectCanCommit(Object canCommitReply) {
419                 return expectCanCommit(CanCommitTransaction.class, canCommitReply);
420             }
421
422             Builder expectCommit(Class<?> expCommitType, Object commitReply) {
423                 this.expCommitType = expCommitType;
424                 this.commitReply = commitReply;
425                 return this;
426             }
427
428             Builder expectCommit(Object commitReply) {
429                 return expectCommit(CommitTransaction.class, commitReply);
430             }
431
432             Builder expectAbort(Class<?> expAbortType, Object abortReply) {
433                 this.expAbortType = expAbortType;
434                 this.abortReply = abortReply;
435                 return this;
436             }
437
438             Builder expectAbort(Object abortReply) {
439                 return expectAbort(AbortTransaction.class, abortReply);
440             }
441
442             Props props() {
443                 return Props.create(CohortActor.class, this);
444             }
445         }
446     }
447 }