Fix followerDistributedDataStore tear down
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxyTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static java.util.Objects.requireNonNull;
11 import static org.hamcrest.CoreMatchers.instanceOf;
12 import static org.hamcrest.MatcherAssert.assertThat;
13 import static org.junit.Assert.assertEquals;
14 import static org.junit.Assert.assertNotNull;
15 import static org.junit.Assert.assertThrows;
16 import static org.mockito.Mockito.lenient;
17 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
18
19 import akka.actor.ActorSelection;
20 import akka.actor.Props;
21 import akka.actor.UntypedAbstractActor;
22 import akka.dispatch.Dispatchers;
23 import akka.dispatch.Futures;
24 import akka.testkit.TestActorRef;
25 import com.codahale.metrics.Snapshot;
26 import com.codahale.metrics.Timer;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import java.util.ArrayList;
29 import java.util.List;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import org.junit.Before;
34 import org.junit.Test;
35 import org.junit.runner.RunWith;
36 import org.mockito.Mock;
37 import org.mockito.junit.MockitoJUnitRunner;
38 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
39 import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy.CohortInfo;
40 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
41 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
42 import org.opendaylight.controller.cluster.datastore.messages.AbstractThreePhaseCommitMessage;
43 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
44 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
45 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
46 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
47 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
48 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
49 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
50 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
51 import org.opendaylight.controller.cluster.raft.TestActorFactory;
52 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
53
54 @Deprecated(since = "9.0.0", forRemoval = true)
55 @RunWith(MockitoJUnitRunner.StrictStubs.class)
56 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
57     static class TestException extends RuntimeException {
58         private static final long serialVersionUID = 1L;
59
60     }
61
62     private ActorUtils actorUtils;
63
64     @Mock
65     private Timer commitTimer;
66     @Mock
67     private Timer.Context commitTimerContext;
68     @Mock
69     private Snapshot commitSnapshot;
70
71     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
72     private final List<TestActorRef<CohortActor>> cohortActors = new ArrayList<>();
73     private final TransactionIdentifier tx = nextTransactionId();
74
75     @Before
76     public void setUp() {
77         actorUtils = new ActorUtils(getSystem(), actorFactory.createActor(Props.create(DoNothingActor.class)),
78                 new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(),
79                 new PrimaryShardInfoFutureCache()) {
80             @Override
81             public Timer getOperationTimer(final String operationName) {
82                 return commitTimer;
83             }
84
85             @Override
86             public double getTxCreationLimit() {
87                 return 10.0;
88             }
89         };
90
91         lenient().doReturn(commitTimerContext).when(commitTimer).time();
92         lenient().doReturn(commitSnapshot).when(commitTimer).getSnapshot();
93         for (int i = 1; i < 11; i++) {
94             // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
95             // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
96             lenient().doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
97         }
98     }
99
100     @Test
101     public void testCanCommitYesWithOneCohort() throws Exception {
102         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(
103             newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)))),
104             tx);
105
106         verifyCanCommit(proxy.canCommit(), true);
107         verifyCohortActors();
108     }
109
110     @Test
111     public void testCanCommitNoWithOneCohort() throws Exception {
112         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(
113             newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.no(CURRENT_VERSION)))),
114             tx);
115
116         verifyCanCommit(proxy.canCommit(), false);
117         verifyCohortActors();
118     }
119
120     @Test
121     public void testCanCommitYesWithTwoCohorts() throws Exception {
122         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(
123             newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))),
124             newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)))),
125             tx);
126
127         verifyCanCommit(proxy.canCommit(), true);
128         verifyCohortActors();
129     }
130
131     @Test
132     public void testCanCommitNoWithThreeCohorts() throws Exception {
133         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(
134             newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))),
135             newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.no(CURRENT_VERSION))),
136             newCohortInfo(new CohortActor.Builder(tx))), tx);
137
138         verifyCanCommit(proxy.canCommit(), false);
139         verifyCohortActors();
140     }
141
142     @Test
143     public void testCanCommitWithExceptionFailure() {
144         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils,
145             List.of(newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(new TestException()))), tx);
146
147         propagateExecutionExceptionCause(proxy.canCommit(), TestException.class);
148     }
149
150     @Test
151     public void testCanCommitWithInvalidResponseType() {
152         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils,
153             List.of(newCohortInfo(new CohortActor.Builder(tx).expectCanCommit("invalid"))), tx);
154
155         assertEquals("Unexpected response type class java.lang.String",
156             propagateExecutionExceptionCause(proxy.canCommit(), IllegalArgumentException.class));
157     }
158
159     @Test
160     public void testCanCommitWithFailedCohortFuture() throws Exception {
161         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(
162             newCohortInfo(new CohortActor.Builder(tx)),
163             newCohortInfoWithFailedFuture(new TestException()),
164             newCohortInfo(new CohortActor.Builder(tx))), tx);
165
166         propagateExecutionExceptionCause(proxy.canCommit(), TestException.class);
167     }
168
169     @Test
170     public void testAllThreePhasesSuccessful() throws Exception {
171         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(
172             newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
173                 .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
174             newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
175                 .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION)))), tx);
176
177         verifyCanCommit(proxy.canCommit(), true);
178         verifySuccessfulFuture(proxy.preCommit());
179         verifySuccessfulFuture(proxy.commit());
180         verifyCohortActors();
181     }
182
183     @Test
184     public void testCommitWithExceptionFailure() throws Exception {
185         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(
186             newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
187                 .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
188             newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
189             .expectCommit(new TestException()))), tx);
190
191         verifyCanCommit(proxy.canCommit(), true);
192         verifySuccessfulFuture(proxy.preCommit());
193         propagateExecutionExceptionCause(proxy.commit(), TestException.class);
194     }
195
196     @Test
197     public void testCommitWithInvalidResponseType() throws Exception {
198         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils,List.of(
199             newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
200                 .expectCommit("invalid"))),
201             tx);
202
203         verifyCanCommit(proxy.canCommit(), true);
204         verifySuccessfulFuture(proxy.preCommit());
205         assertEquals("Unexpected response type class java.lang.String",
206             propagateExecutionExceptionCause(proxy.commit(), IllegalArgumentException.class));
207     }
208
209     @Test
210     public void testAbort() throws Exception {
211         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils,
212             List.of(newCohortInfo(new CohortActor.Builder(tx).expectAbort(
213                 AbortTransactionReply.instance(CURRENT_VERSION)))),
214             tx);
215
216         verifySuccessfulFuture(proxy.abort());
217         verifyCohortActors();
218     }
219
220     @Test
221     public void testAbortWithFailure() throws Exception {
222         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils,
223             List.of(newCohortInfo(new CohortActor.Builder(tx).expectAbort(new RuntimeException("mock")))), tx);
224
225         // The exception should not get propagated.
226         verifySuccessfulFuture(proxy.abort());
227         verifyCohortActors();
228     }
229
230     @Test
231     public void testAbortWithFailedCohortFuture() throws Exception {
232         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(
233             newCohortInfoWithFailedFuture(new TestException()), newCohortInfo(new CohortActor.Builder(tx))), tx);
234
235         verifySuccessfulFuture(proxy.abort());
236         verifyCohortActors();
237     }
238
239     @Test
240     public void testWithNoCohorts() throws Exception {
241         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(), tx);
242
243         verifyCanCommit(proxy.canCommit(), true);
244         verifySuccessfulFuture(proxy.preCommit());
245         verifySuccessfulFuture(proxy.commit());
246         verifyCohortActors();
247     }
248
249     private String propagateExecutionExceptionCause(final ListenableFuture<?> future,
250             final Class<? extends Exception> expected) {
251         final var ex = assertThrows(ExecutionException.class, () -> future.get(5, TimeUnit.SECONDS)).getCause();
252         verifyCohortActors();
253         assertThat(ex, instanceOf(expected));
254         return ex.getMessage();
255     }
256
257     private CohortInfo newCohortInfo(final CohortActor.Builder builder, final short version) {
258         TestActorRef<CohortActor> actor = actorFactory.createTestActor(builder.props()
259                 .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("cohort"));
260         cohortActors.add(actor);
261         return new CohortInfo(Futures.successful(getSystem().actorSelection(actor.path())), () -> version);
262     }
263
264     private CohortInfo newCohortInfo(final CohortActor.Builder builder) {
265         return newCohortInfo(builder, CURRENT_VERSION);
266     }
267
268     private static CohortInfo newCohortInfoWithFailedFuture(final Exception failure) {
269         return new CohortInfo(Futures.<ActorSelection>failed(failure), () -> CURRENT_VERSION);
270     }
271
272     private void verifyCohortActors() {
273         for (TestActorRef<CohortActor> actor: cohortActors) {
274             actor.underlyingActor().verify();
275         }
276     }
277
278     @SuppressWarnings("checkstyle:IllegalCatch")
279     private <T> T verifySuccessfulFuture(final ListenableFuture<T> future) throws Exception {
280         try {
281             return future.get(5, TimeUnit.SECONDS);
282         } catch (Exception e) {
283             verifyCohortActors();
284             throw e;
285         }
286     }
287
288     private void verifyCanCommit(final ListenableFuture<Boolean> future, final boolean expected) throws Exception {
289         Boolean actual = verifySuccessfulFuture(future);
290         assertEquals("canCommit", expected, actual);
291     }
292
293     private static class CohortActor extends UntypedAbstractActor {
294         private final Builder builder;
295         private final AtomicInteger canCommitCount = new AtomicInteger();
296         private final AtomicInteger commitCount = new AtomicInteger();
297         private final AtomicInteger abortCount = new AtomicInteger();
298         private volatile AssertionError assertionError;
299
300         CohortActor(final Builder builder) {
301             this.builder = builder;
302         }
303
304         @Override
305         public void onReceive(final Object message) {
306             if (CanCommitTransaction.isSerializedType(message)) {
307                 canCommitCount.incrementAndGet();
308                 onMessage("CanCommitTransaction", message, CanCommitTransaction.fromSerializable(message),
309                         builder.expCanCommitType, builder.canCommitReply);
310             } else if (CommitTransaction.isSerializedType(message)) {
311                 commitCount.incrementAndGet();
312                 onMessage("CommitTransaction", message, CommitTransaction.fromSerializable(message),
313                         builder.expCommitType, builder.commitReply);
314             } else if (AbortTransaction.isSerializedType(message)) {
315                 abortCount.incrementAndGet();
316                 onMessage("AbortTransaction", message, AbortTransaction.fromSerializable(message),
317                         builder.expAbortType, builder.abortReply);
318             } else {
319                 assertionError = new AssertionError("Unexpected message " + message);
320             }
321         }
322
323         private void onMessage(final String name, final Object rawMessage,
324                 final AbstractThreePhaseCommitMessage actualMessage, final Class<?> expType, final Object reply) {
325             try {
326                 assertNotNull("Unexpected " + name, expType);
327                 assertEquals(name + " type", expType, rawMessage.getClass());
328                 assertEquals(name + " transactionId", builder.transactionId, actualMessage.getTransactionId());
329
330                 if (reply instanceof Throwable) {
331                     getSender().tell(new akka.actor.Status.Failure((Throwable)reply), self());
332                 } else {
333                     getSender().tell(reply, self());
334                 }
335             } catch (AssertionError e) {
336                 assertionError = e;
337             }
338         }
339
340         void verify() {
341             if (assertionError != null) {
342                 throw assertionError;
343             }
344
345             if (builder.expCanCommitType != null) {
346                 assertEquals("CanCommitTransaction count", 1, canCommitCount.get());
347             }
348
349             if (builder.expCommitType != null) {
350                 assertEquals("CommitTransaction count", 1, commitCount.get());
351             }
352
353             if (builder.expAbortType != null) {
354                 assertEquals("AbortTransaction count", 1, abortCount.get());
355             }
356         }
357
358         static class Builder {
359             private Class<?> expCanCommitType;
360             private Class<?> expCommitType;
361             private Class<?> expAbortType;
362             private Object canCommitReply;
363             private Object commitReply;
364             private Object abortReply;
365             private final TransactionIdentifier transactionId;
366
367             Builder(final TransactionIdentifier transactionId) {
368                 this.transactionId = requireNonNull(transactionId);
369             }
370
371             Builder expectCanCommit(final Class<?> newExpCanCommitType, final Object newCanCommitReply) {
372                 expCanCommitType = newExpCanCommitType;
373                 canCommitReply = newCanCommitReply;
374                 return this;
375             }
376
377             Builder expectCanCommit(final Object newCanCommitReply) {
378                 return expectCanCommit(CanCommitTransaction.class, newCanCommitReply);
379             }
380
381             Builder expectCommit(final Class<?> newExpCommitType, final Object newCommitReply) {
382                 expCommitType = newExpCommitType;
383                 commitReply = newCommitReply;
384                 return this;
385             }
386
387             Builder expectCommit(final Object newCommitReply) {
388                 return expectCommit(CommitTransaction.class, newCommitReply);
389             }
390
391             Builder expectAbort(final Class<?> newExpAbortType, final Object newAbortReply) {
392                 expAbortType = newExpAbortType;
393                 abortReply = newAbortReply;
394                 return this;
395             }
396
397             Builder expectAbort(final Object newAbortReply) {
398                 return expectAbort(AbortTransaction.class, newAbortReply);
399             }
400
401             Props props() {
402                 return Props.create(CohortActor.class, this);
403             }
404         }
405     }
406 }