Modernize ThreePhaseCommitCohortProxyTest
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxyTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static java.util.Objects.requireNonNull;
11 import static org.hamcrest.CoreMatchers.instanceOf;
12 import static org.hamcrest.MatcherAssert.assertThat;
13 import static org.junit.Assert.assertEquals;
14 import static org.junit.Assert.assertNotNull;
15 import static org.junit.Assert.assertThrows;
16 import static org.mockito.Mockito.lenient;
17 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
18
19 import akka.actor.ActorSelection;
20 import akka.actor.Props;
21 import akka.actor.UntypedAbstractActor;
22 import akka.dispatch.Dispatchers;
23 import akka.dispatch.Futures;
24 import akka.testkit.TestActorRef;
25 import com.codahale.metrics.Snapshot;
26 import com.codahale.metrics.Timer;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import java.util.ArrayList;
29 import java.util.List;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import org.junit.Before;
34 import org.junit.Test;
35 import org.junit.runner.RunWith;
36 import org.mockito.Mock;
37 import org.mockito.junit.MockitoJUnitRunner;
38 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
39 import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy.CohortInfo;
40 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
41 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
42 import org.opendaylight.controller.cluster.datastore.messages.AbstractThreePhaseCommitMessage;
43 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
44 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
45 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
46 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
47 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
48 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
49 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
50 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
51 import org.opendaylight.controller.cluster.raft.TestActorFactory;
52 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
53
54 @RunWith(MockitoJUnitRunner.StrictStubs.class)
55 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
56     static class TestException extends RuntimeException {
57         private static final long serialVersionUID = 1L;
58
59     }
60
61     private ActorUtils actorUtils;
62
63     @Mock
64     private Timer commitTimer;
65     @Mock
66     private Timer.Context commitTimerContext;
67     @Mock
68     private Snapshot commitSnapshot;
69
70     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
71     private final List<TestActorRef<CohortActor>> cohortActors = new ArrayList<>();
72     private final TransactionIdentifier tx = nextTransactionId();
73
74     @Before
75     public void setUp() {
76         actorUtils = new ActorUtils(getSystem(), actorFactory.createActor(Props.create(DoNothingActor.class)),
77                 new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(),
78                 new PrimaryShardInfoFutureCache()) {
79             @Override
80             public Timer getOperationTimer(final String operationName) {
81                 return commitTimer;
82             }
83
84             @Override
85             public double getTxCreationLimit() {
86                 return 10.0;
87             }
88         };
89
90         lenient().doReturn(commitTimerContext).when(commitTimer).time();
91         lenient().doReturn(commitSnapshot).when(commitTimer).getSnapshot();
92         for (int i = 1; i < 11; i++) {
93             // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
94             // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
95             lenient().doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
96         }
97     }
98
99     @Test
100     public void testCanCommitYesWithOneCohort() throws Exception {
101         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(
102             newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)))),
103             tx);
104
105         verifyCanCommit(proxy.canCommit(), true);
106         verifyCohortActors();
107     }
108
109     @Test
110     public void testCanCommitNoWithOneCohort() throws Exception {
111         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(
112             newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.no(CURRENT_VERSION)))),
113             tx);
114
115         verifyCanCommit(proxy.canCommit(), false);
116         verifyCohortActors();
117     }
118
119     @Test
120     public void testCanCommitYesWithTwoCohorts() throws Exception {
121         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(
122             newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))),
123             newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)))),
124             tx);
125
126         verifyCanCommit(proxy.canCommit(), true);
127         verifyCohortActors();
128     }
129
130     @Test
131     public void testCanCommitNoWithThreeCohorts() throws Exception {
132         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(
133             newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))),
134             newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.no(CURRENT_VERSION))),
135             newCohortInfo(new CohortActor.Builder(tx))), tx);
136
137         verifyCanCommit(proxy.canCommit(), false);
138         verifyCohortActors();
139     }
140
141     @Test
142     public void testCanCommitWithExceptionFailure() {
143         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils,
144             List.of(newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(new TestException()))), tx);
145
146         propagateExecutionExceptionCause(proxy.canCommit(), TestException.class);
147     }
148
149     @Test
150     public void testCanCommitWithInvalidResponseType() {
151         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils,
152             List.of(newCohortInfo(new CohortActor.Builder(tx).expectCanCommit("invalid"))), tx);
153
154         assertEquals("Unexpected response type class java.lang.String",
155             propagateExecutionExceptionCause(proxy.canCommit(), IllegalArgumentException.class));
156     }
157
158     @Test
159     public void testCanCommitWithFailedCohortFuture() throws Exception {
160         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(
161             newCohortInfo(new CohortActor.Builder(tx)),
162             newCohortInfoWithFailedFuture(new TestException()),
163             newCohortInfo(new CohortActor.Builder(tx))), tx);
164
165         propagateExecutionExceptionCause(proxy.canCommit(), TestException.class);
166     }
167
168     @Test
169     public void testAllThreePhasesSuccessful() throws Exception {
170         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(
171             newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
172                 .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
173             newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
174                 .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION)))), tx);
175
176         verifyCanCommit(proxy.canCommit(), true);
177         verifySuccessfulFuture(proxy.preCommit());
178         verifySuccessfulFuture(proxy.commit());
179         verifyCohortActors();
180     }
181
182     @Test
183     public void testCommitWithExceptionFailure() throws Exception {
184         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(
185             newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
186                 .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
187             newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
188             .expectCommit(new TestException()))), tx);
189
190         verifyCanCommit(proxy.canCommit(), true);
191         verifySuccessfulFuture(proxy.preCommit());
192         propagateExecutionExceptionCause(proxy.commit(), TestException.class);
193     }
194
195     @Test
196     public void testCommitWithInvalidResponseType() throws Exception {
197         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils,List.of(
198             newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
199                 .expectCommit("invalid"))),
200             tx);
201
202         verifyCanCommit(proxy.canCommit(), true);
203         verifySuccessfulFuture(proxy.preCommit());
204         assertEquals("Unexpected response type class java.lang.String",
205             propagateExecutionExceptionCause(proxy.commit(), IllegalArgumentException.class));
206     }
207
208     @Test
209     public void testAbort() throws Exception {
210         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils,
211             List.of(newCohortInfo(new CohortActor.Builder(tx).expectAbort(
212                 AbortTransactionReply.instance(CURRENT_VERSION)))),
213             tx);
214
215         verifySuccessfulFuture(proxy.abort());
216         verifyCohortActors();
217     }
218
219     @Test
220     public void testAbortWithFailure() throws Exception {
221         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils,
222             List.of(newCohortInfo(new CohortActor.Builder(tx).expectAbort(new RuntimeException("mock")))), tx);
223
224         // The exception should not get propagated.
225         verifySuccessfulFuture(proxy.abort());
226         verifyCohortActors();
227     }
228
229     @Test
230     public void testAbortWithFailedCohortFuture() throws Exception {
231         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(
232             newCohortInfoWithFailedFuture(new TestException()), newCohortInfo(new CohortActor.Builder(tx))), tx);
233
234         verifySuccessfulFuture(proxy.abort());
235         verifyCohortActors();
236     }
237
238     @Test
239     public void testWithNoCohorts() throws Exception {
240         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(), tx);
241
242         verifyCanCommit(proxy.canCommit(), true);
243         verifySuccessfulFuture(proxy.preCommit());
244         verifySuccessfulFuture(proxy.commit());
245         verifyCohortActors();
246     }
247
248     private String propagateExecutionExceptionCause(final ListenableFuture<?> future,
249             final Class<? extends Exception> expected) {
250         final var ex = assertThrows(ExecutionException.class, () -> future.get(5, TimeUnit.SECONDS)).getCause();
251         verifyCohortActors();
252         assertThat(ex, instanceOf(expected));
253         return ex.getMessage();
254     }
255
256     private CohortInfo newCohortInfo(final CohortActor.Builder builder, final short version) {
257         TestActorRef<CohortActor> actor = actorFactory.createTestActor(builder.props()
258                 .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("cohort"));
259         cohortActors.add(actor);
260         return new CohortInfo(Futures.successful(getSystem().actorSelection(actor.path())), () -> version);
261     }
262
263     private CohortInfo newCohortInfo(final CohortActor.Builder builder) {
264         return newCohortInfo(builder, CURRENT_VERSION);
265     }
266
267     private static CohortInfo newCohortInfoWithFailedFuture(final Exception failure) {
268         return new CohortInfo(Futures.<ActorSelection>failed(failure), () -> CURRENT_VERSION);
269     }
270
271     private void verifyCohortActors() {
272         for (TestActorRef<CohortActor> actor: cohortActors) {
273             actor.underlyingActor().verify();
274         }
275     }
276
277     @SuppressWarnings("checkstyle:IllegalCatch")
278     private <T> T verifySuccessfulFuture(final ListenableFuture<T> future) throws Exception {
279         try {
280             return future.get(5, TimeUnit.SECONDS);
281         } catch (Exception e) {
282             verifyCohortActors();
283             throw e;
284         }
285     }
286
287     private void verifyCanCommit(final ListenableFuture<Boolean> future, final boolean expected) throws Exception {
288         Boolean actual = verifySuccessfulFuture(future);
289         assertEquals("canCommit", expected, actual);
290     }
291
292     private static class CohortActor extends UntypedAbstractActor {
293         private final Builder builder;
294         private final AtomicInteger canCommitCount = new AtomicInteger();
295         private final AtomicInteger commitCount = new AtomicInteger();
296         private final AtomicInteger abortCount = new AtomicInteger();
297         private volatile AssertionError assertionError;
298
299         CohortActor(final Builder builder) {
300             this.builder = builder;
301         }
302
303         @Override
304         public void onReceive(final Object message) {
305             if (CanCommitTransaction.isSerializedType(message)) {
306                 canCommitCount.incrementAndGet();
307                 onMessage("CanCommitTransaction", message, CanCommitTransaction.fromSerializable(message),
308                         builder.expCanCommitType, builder.canCommitReply);
309             } else if (CommitTransaction.isSerializedType(message)) {
310                 commitCount.incrementAndGet();
311                 onMessage("CommitTransaction", message, CommitTransaction.fromSerializable(message),
312                         builder.expCommitType, builder.commitReply);
313             } else if (AbortTransaction.isSerializedType(message)) {
314                 abortCount.incrementAndGet();
315                 onMessage("AbortTransaction", message, AbortTransaction.fromSerializable(message),
316                         builder.expAbortType, builder.abortReply);
317             } else {
318                 assertionError = new AssertionError("Unexpected message " + message);
319             }
320         }
321
322         private void onMessage(final String name, final Object rawMessage,
323                 final AbstractThreePhaseCommitMessage actualMessage, final Class<?> expType, final Object reply) {
324             try {
325                 assertNotNull("Unexpected " + name, expType);
326                 assertEquals(name + " type", expType, rawMessage.getClass());
327                 assertEquals(name + " transactionId", builder.transactionId, actualMessage.getTransactionId());
328
329                 if (reply instanceof Throwable) {
330                     getSender().tell(new akka.actor.Status.Failure((Throwable)reply), self());
331                 } else {
332                     getSender().tell(reply, self());
333                 }
334             } catch (AssertionError e) {
335                 assertionError = e;
336             }
337         }
338
339         void verify() {
340             if (assertionError != null) {
341                 throw assertionError;
342             }
343
344             if (builder.expCanCommitType != null) {
345                 assertEquals("CanCommitTransaction count", 1, canCommitCount.get());
346             }
347
348             if (builder.expCommitType != null) {
349                 assertEquals("CommitTransaction count", 1, commitCount.get());
350             }
351
352             if (builder.expAbortType != null) {
353                 assertEquals("AbortTransaction count", 1, abortCount.get());
354             }
355         }
356
357         static class Builder {
358             private Class<?> expCanCommitType;
359             private Class<?> expCommitType;
360             private Class<?> expAbortType;
361             private Object canCommitReply;
362             private Object commitReply;
363             private Object abortReply;
364             private final TransactionIdentifier transactionId;
365
366             Builder(final TransactionIdentifier transactionId) {
367                 this.transactionId = requireNonNull(transactionId);
368             }
369
370             Builder expectCanCommit(final Class<?> newExpCanCommitType, final Object newCanCommitReply) {
371                 expCanCommitType = newExpCanCommitType;
372                 canCommitReply = newCanCommitReply;
373                 return this;
374             }
375
376             Builder expectCanCommit(final Object newCanCommitReply) {
377                 return expectCanCommit(CanCommitTransaction.class, newCanCommitReply);
378             }
379
380             Builder expectCommit(final Class<?> newExpCommitType, final Object newCommitReply) {
381                 expCommitType = newExpCommitType;
382                 commitReply = newCommitReply;
383                 return this;
384             }
385
386             Builder expectCommit(final Object newCommitReply) {
387                 return expectCommit(CommitTransaction.class, newCommitReply);
388             }
389
390             Builder expectAbort(final Class<?> newExpAbortType, final Object newAbortReply) {
391                 expAbortType = newExpAbortType;
392                 abortReply = newAbortReply;
393                 return this;
394             }
395
396             Builder expectAbort(final Object newAbortReply) {
397                 return expectAbort(AbortTransaction.class, newAbortReply);
398             }
399
400             Props props() {
401                 return Props.create(CohortActor.class, this);
402             }
403         }
404     }
405 }