13d3882a04b0fe0681d3bc7c4cd2cf1bc10ab2a1
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxyTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static java.util.Objects.requireNonNull;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.fail;
14 import static org.mockito.Mockito.doReturn;
15 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
16
17 import akka.actor.ActorSelection;
18 import akka.actor.Props;
19 import akka.actor.UntypedAbstractActor;
20 import akka.dispatch.Dispatchers;
21 import akka.dispatch.Futures;
22 import akka.testkit.TestActorRef;
23 import com.codahale.metrics.Snapshot;
24 import com.codahale.metrics.Timer;
25 import com.google.common.base.Throwables;
26 import com.google.common.util.concurrent.ListenableFuture;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.Collections;
30 import java.util.List;
31 import java.util.concurrent.ExecutionException;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import org.junit.Before;
35 import org.junit.Test;
36 import org.mockito.Mock;
37 import org.mockito.MockitoAnnotations;
38 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
39 import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy.CohortInfo;
40 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
41 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
42 import org.opendaylight.controller.cluster.datastore.messages.AbstractThreePhaseCommitMessage;
43 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
44 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
45 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
46 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
47 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
48 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
49 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
50 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
51 import org.opendaylight.controller.cluster.raft.TestActorFactory;
52 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
53
54 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
55
56     @SuppressWarnings("serial")
57     static class TestException extends RuntimeException {
58     }
59
60     private ActorUtils actorUtils;
61
62     @Mock
63     private Timer commitTimer;
64
65     @Mock
66     private Timer.Context commitTimerContext;
67
68     @Mock
69     private Snapshot commitSnapshot;
70
71     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
72     private final List<TestActorRef<CohortActor>> cohortActors = new ArrayList<>();
73     private final TransactionIdentifier tx = nextTransactionId();
74
75
76     @Before
77     public void setUp() {
78         MockitoAnnotations.initMocks(this);
79
80         actorUtils = new ActorUtils(getSystem(), actorFactory.createActor(Props.create(DoNothingActor.class)),
81                 new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(),
82                 new PrimaryShardInfoFutureCache()) {
83             @Override
84             public Timer getOperationTimer(final String operationName) {
85                 return commitTimer;
86             }
87
88             @Override
89             public double getTxCreationLimit() {
90                 return 10.0;
91             }
92         };
93
94         doReturn(commitTimerContext).when(commitTimer).time();
95         doReturn(commitSnapshot).when(commitTimer).getSnapshot();
96         for (int i = 1; i < 11; i++) {
97             // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
98             // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
99             doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
100         }
101     }
102
103     @Test
104     public void testCanCommitYesWithOneCohort() throws Exception {
105         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, Arrays.asList(
106                 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
107                         CanCommitTransactionReply.yes(CURRENT_VERSION)))), tx);
108
109         verifyCanCommit(proxy.canCommit(), true);
110         verifyCohortActors();
111     }
112
113     @Test
114     public void testCanCommitNoWithOneCohort() throws Exception {
115         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, Arrays.asList(
116                 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
117                         CanCommitTransactionReply.no(CURRENT_VERSION)))), tx);
118
119         verifyCanCommit(proxy.canCommit(), false);
120         verifyCohortActors();
121     }
122
123     @Test
124     public void testCanCommitYesWithTwoCohorts() throws Exception {
125         List<CohortInfo> cohorts = Arrays.asList(
126                 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
127                         CanCommitTransactionReply.yes(CURRENT_VERSION))),
128                 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
129                         CanCommitTransactionReply.yes(CURRENT_VERSION))));
130         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, cohorts, tx);
131
132         verifyCanCommit(proxy.canCommit(), true);
133         verifyCohortActors();
134     }
135
136     @Test
137     public void testCanCommitNoWithThreeCohorts() throws Exception {
138         List<CohortInfo> cohorts = Arrays.asList(
139                 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
140                         CanCommitTransactionReply.yes(CURRENT_VERSION))),
141                 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
142                         CanCommitTransactionReply.no(CURRENT_VERSION))),
143                 newCohortInfo(new CohortActor.Builder(tx)));
144         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, cohorts, tx);
145
146         verifyCanCommit(proxy.canCommit(), false);
147         verifyCohortActors();
148     }
149
150     @Test(expected = TestException.class)
151     public void testCanCommitWithExceptionFailure() throws Exception {
152         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, Arrays.asList(
153                 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(new TestException()))), tx);
154
155         propagateExecutionExceptionCause(proxy.canCommit());
156     }
157
158     @Test(expected = IllegalArgumentException.class)
159     public void testCanCommitWithInvalidResponseType() throws Exception {
160         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, Arrays.asList(
161                 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit("invalid"))), tx);
162
163         propagateExecutionExceptionCause(proxy.canCommit());
164     }
165
166     @Test(expected = TestException.class)
167     public void testCanCommitWithFailedCohortFuture() throws Exception {
168         List<CohortInfo> cohorts = Arrays.asList(
169                 newCohortInfo(new CohortActor.Builder(tx)),
170                 newCohortInfoWithFailedFuture(new TestException()),
171                 newCohortInfo(new CohortActor.Builder(tx)));
172         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, cohorts, tx);
173
174         propagateExecutionExceptionCause(proxy.canCommit());
175     }
176
177     @Test
178     public void testAllThreePhasesSuccessful() throws Exception {
179         List<CohortInfo> cohorts = Arrays.asList(
180                 newCohortInfo(
181                         new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
182                                 .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
183                 newCohortInfo(
184                         new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
185                                 .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))));
186         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, cohorts, tx);
187
188         verifyCanCommit(proxy.canCommit(), true);
189         verifySuccessfulFuture(proxy.preCommit());
190         verifySuccessfulFuture(proxy.commit());
191         verifyCohortActors();
192     }
193
194     @Test(expected = TestException.class)
195     public void testCommitWithExceptionFailure() throws Exception {
196         List<CohortInfo> cohorts = Arrays.asList(
197                 newCohortInfo(
198                         new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
199                                 .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
200                 newCohortInfo(
201                         new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
202                                 .expectCommit(new TestException())));
203         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, cohorts, tx);
204
205         verifyCanCommit(proxy.canCommit(), true);
206         verifySuccessfulFuture(proxy.preCommit());
207         propagateExecutionExceptionCause(proxy.commit());
208     }
209
210     @Test(expected = IllegalArgumentException.class)
211     public void testCommitWithInvalidResponseType() throws Exception {
212         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils,
213                 Arrays.asList(newCohortInfo(new CohortActor.Builder(tx)
214                         .expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).expectCommit("invalid"))), tx);
215
216         verifyCanCommit(proxy.canCommit(), true);
217         verifySuccessfulFuture(proxy.preCommit());
218         propagateExecutionExceptionCause(proxy.commit());
219     }
220
221     @Test
222     public void testAbort() throws Exception {
223         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, Arrays.asList(
224                 newCohortInfo(new CohortActor.Builder(tx).expectAbort(
225                         AbortTransactionReply.instance(CURRENT_VERSION)))), tx);
226
227         verifySuccessfulFuture(proxy.abort());
228         verifyCohortActors();
229     }
230
231     @Test
232     public void testAbortWithFailure() throws Exception {
233         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, Arrays.asList(
234                 newCohortInfo(new CohortActor.Builder(tx).expectAbort(new RuntimeException("mock")))), tx);
235
236         // The exception should not get propagated.
237         verifySuccessfulFuture(proxy.abort());
238         verifyCohortActors();
239     }
240
241     @Test
242     public void testAbortWithFailedCohortFuture() throws Exception {
243         List<CohortInfo> cohorts = Arrays.asList(
244                 newCohortInfoWithFailedFuture(new TestException()), newCohortInfo(new CohortActor.Builder(tx)));
245         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, cohorts, tx);
246
247         verifySuccessfulFuture(proxy.abort());
248         verifyCohortActors();
249     }
250
251     @Test
252     public void testWithNoCohorts() throws Exception {
253         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils,
254                 Collections.<CohortInfo>emptyList(), tx);
255
256         verifyCanCommit(proxy.canCommit(), true);
257         verifySuccessfulFuture(proxy.preCommit());
258         verifySuccessfulFuture(proxy.commit());
259         verifyCohortActors();
260     }
261
262     @SuppressWarnings("checkstyle:avoidHidingCauseException")
263     private void propagateExecutionExceptionCause(final ListenableFuture<?> future) throws Exception {
264         try {
265             future.get(5, TimeUnit.SECONDS);
266             fail("Expected ExecutionException");
267         } catch (ExecutionException e) {
268             verifyCohortActors();
269             Throwables.propagateIfPossible(e.getCause(), Exception.class);
270             throw new RuntimeException(e.getCause());
271         }
272     }
273
274     private CohortInfo newCohortInfo(final CohortActor.Builder builder, final short version) {
275         TestActorRef<CohortActor> actor = actorFactory.createTestActor(builder.props()
276                 .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("cohort"));
277         cohortActors.add(actor);
278         return new CohortInfo(Futures.successful(getSystem().actorSelection(actor.path())), () -> version);
279     }
280
281     private CohortInfo newCohortInfo(final CohortActor.Builder builder) {
282         return newCohortInfo(builder, CURRENT_VERSION);
283     }
284
285     private static CohortInfo newCohortInfoWithFailedFuture(final Exception failure) {
286         return new CohortInfo(Futures.<ActorSelection>failed(failure), () -> CURRENT_VERSION);
287     }
288
289     private void verifyCohortActors() {
290         for (TestActorRef<CohortActor> actor: cohortActors) {
291             actor.underlyingActor().verify();
292         }
293     }
294
295     @SuppressWarnings("checkstyle:IllegalCatch")
296     private <T> T verifySuccessfulFuture(final ListenableFuture<T> future) throws Exception {
297         try {
298             return future.get(5, TimeUnit.SECONDS);
299         } catch (Exception e) {
300             verifyCohortActors();
301             throw e;
302         }
303     }
304
305     private void verifyCanCommit(final ListenableFuture<Boolean> future, final boolean expected) throws Exception {
306         Boolean actual = verifySuccessfulFuture(future);
307         assertEquals("canCommit", expected, actual);
308     }
309
310     private static class CohortActor extends UntypedAbstractActor {
311         private final Builder builder;
312         private final AtomicInteger canCommitCount = new AtomicInteger();
313         private final AtomicInteger commitCount = new AtomicInteger();
314         private final AtomicInteger abortCount = new AtomicInteger();
315         private volatile AssertionError assertionError;
316
317         CohortActor(final Builder builder) {
318             this.builder = builder;
319         }
320
321         @Override
322         public void onReceive(final Object message) {
323             if (CanCommitTransaction.isSerializedType(message)) {
324                 canCommitCount.incrementAndGet();
325                 onMessage("CanCommitTransaction", message, CanCommitTransaction.fromSerializable(message),
326                         builder.expCanCommitType, builder.canCommitReply);
327             } else if (CommitTransaction.isSerializedType(message)) {
328                 commitCount.incrementAndGet();
329                 onMessage("CommitTransaction", message, CommitTransaction.fromSerializable(message),
330                         builder.expCommitType, builder.commitReply);
331             } else if (AbortTransaction.isSerializedType(message)) {
332                 abortCount.incrementAndGet();
333                 onMessage("AbortTransaction", message, AbortTransaction.fromSerializable(message),
334                         builder.expAbortType, builder.abortReply);
335             } else {
336                 assertionError = new AssertionError("Unexpected message " + message);
337             }
338         }
339
340         private void onMessage(final String name, final Object rawMessage,
341                 final AbstractThreePhaseCommitMessage actualMessage, final Class<?> expType, final Object reply) {
342             try {
343                 assertNotNull("Unexpected " + name, expType);
344                 assertEquals(name + " type", expType, rawMessage.getClass());
345                 assertEquals(name + " transactionId", builder.transactionId, actualMessage.getTransactionId());
346
347                 if (reply instanceof Throwable) {
348                     getSender().tell(new akka.actor.Status.Failure((Throwable)reply), self());
349                 } else {
350                     getSender().tell(reply, self());
351                 }
352             } catch (AssertionError e) {
353                 assertionError = e;
354             }
355         }
356
357         void verify() {
358             if (assertionError != null) {
359                 throw assertionError;
360             }
361
362             if (builder.expCanCommitType != null) {
363                 assertEquals("CanCommitTransaction count", 1, canCommitCount.get());
364             }
365
366             if (builder.expCommitType != null) {
367                 assertEquals("CommitTransaction count", 1, commitCount.get());
368             }
369
370             if (builder.expAbortType != null) {
371                 assertEquals("AbortTransaction count", 1, abortCount.get());
372             }
373         }
374
375         static class Builder {
376             private Class<?> expCanCommitType;
377             private Class<?> expCommitType;
378             private Class<?> expAbortType;
379             private Object canCommitReply;
380             private Object commitReply;
381             private Object abortReply;
382             private final TransactionIdentifier transactionId;
383
384             Builder(final TransactionIdentifier transactionId) {
385                 this.transactionId = requireNonNull(transactionId);
386             }
387
388             Builder expectCanCommit(final Class<?> newExpCanCommitType, final Object newCanCommitReply) {
389                 this.expCanCommitType = newExpCanCommitType;
390                 this.canCommitReply = newCanCommitReply;
391                 return this;
392             }
393
394             Builder expectCanCommit(final Object newCanCommitReply) {
395                 return expectCanCommit(CanCommitTransaction.class, newCanCommitReply);
396             }
397
398             Builder expectCommit(final Class<?> newExpCommitType, final Object newCommitReply) {
399                 this.expCommitType = newExpCommitType;
400                 this.commitReply = newCommitReply;
401                 return this;
402             }
403
404             Builder expectCommit(final Object newCommitReply) {
405                 return expectCommit(CommitTransaction.class, newCommitReply);
406             }
407
408             Builder expectAbort(final Class<?> newExpAbortType, final Object newAbortReply) {
409                 this.expAbortType = newExpAbortType;
410                 this.abortReply = newAbortReply;
411                 return this;
412             }
413
414             Builder expectAbort(final Object newAbortReply) {
415                 return expectAbort(AbortTransaction.class, newAbortReply);
416             }
417
418             Props props() {
419                 return Props.create(CohortActor.class, this);
420             }
421         }
422     }
423 }