ba7295bcea75a7a3b4f9806549065211a0290f13
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxyTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.fail;
14 import static org.mockito.Mockito.doReturn;
15 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
16
17 import akka.actor.ActorSelection;
18 import akka.actor.Props;
19 import akka.actor.UntypedActor;
20 import akka.dispatch.Dispatchers;
21 import akka.dispatch.Futures;
22 import akka.testkit.TestActorRef;
23 import com.codahale.metrics.Snapshot;
24 import com.codahale.metrics.Timer;
25 import com.google.common.base.Preconditions;
26 import com.google.common.base.Throwables;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Collections;
31 import java.util.List;
32 import java.util.concurrent.ExecutionException;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import org.junit.Before;
36 import org.junit.Test;
37 import org.mockito.Mock;
38 import org.mockito.MockitoAnnotations;
39 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
40 import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy.CohortInfo;
41 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
42 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
43 import org.opendaylight.controller.cluster.datastore.messages.AbstractThreePhaseCommitMessage;
44 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
45 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
46 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
47 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
48 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
49 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
50 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
51 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
52 import org.opendaylight.controller.cluster.raft.TestActorFactory;
53 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
54
55 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
56
57     @SuppressWarnings("serial")
58     static class TestException extends RuntimeException {
59     }
60
61     private ActorContext actorContext;
62
63     @Mock
64     private Timer commitTimer;
65
66     @Mock
67     private Timer.Context commitTimerContext;
68
69     @Mock
70     private Snapshot commitSnapshot;
71
72     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
73     private final List<TestActorRef<CohortActor>> cohortActors = new ArrayList<>();
74     private final TransactionIdentifier tx = nextTransactionId();
75
76
77     @Before
78     public void setUp() {
79         MockitoAnnotations.initMocks(this);
80
81         actorContext = new ActorContext(getSystem(), actorFactory.createActor(Props.create(DoNothingActor.class)),
82                 new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(),
83                 new PrimaryShardInfoFutureCache()) {
84             @Override
85             public Timer getOperationTimer(final String operationName) {
86                 return commitTimer;
87             }
88
89             @Override
90             public double getTxCreationLimit() {
91                 return 10.0;
92             }
93         };
94
95         doReturn(commitTimerContext).when(commitTimer).time();
96         doReturn(commitSnapshot).when(commitTimer).getSnapshot();
97         for (int i = 1; i < 11; i++) {
98             // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
99             // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
100             doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
101         }
102     }
103
104     @Test
105     public void testCanCommitYesWithOneCohort() throws Exception {
106         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
107                 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
108                         CanCommitTransactionReply.yes(CURRENT_VERSION)))), tx);
109
110         verifyCanCommit(proxy.canCommit(), true);
111         verifyCohortActors();
112     }
113
114     @Test
115     public void testCanCommitNoWithOneCohort() throws Exception {
116         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
117                 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
118                         CanCommitTransactionReply.no(CURRENT_VERSION)))), tx);
119
120         verifyCanCommit(proxy.canCommit(), false);
121         verifyCohortActors();
122     }
123
124     @Test
125     public void testCanCommitYesWithTwoCohorts() throws Exception {
126         List<CohortInfo> cohorts = Arrays.asList(
127                 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
128                         CanCommitTransactionReply.yes(CURRENT_VERSION))),
129                 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
130                         CanCommitTransactionReply.yes(CURRENT_VERSION))));
131         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
132
133         verifyCanCommit(proxy.canCommit(), true);
134         verifyCohortActors();
135     }
136
137     @Test
138     public void testCanCommitNoWithThreeCohorts() throws Exception {
139         List<CohortInfo> cohorts = Arrays.asList(
140                 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
141                         CanCommitTransactionReply.yes(CURRENT_VERSION))),
142                 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
143                         CanCommitTransactionReply.no(CURRENT_VERSION))),
144                 newCohortInfo(new CohortActor.Builder(tx)));
145         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
146
147         verifyCanCommit(proxy.canCommit(), false);
148         verifyCohortActors();
149     }
150
151     @Test(expected = TestException.class)
152     public void testCanCommitWithExceptionFailure() throws Exception {
153         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
154                 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(new TestException()))), tx);
155
156         propagateExecutionExceptionCause(proxy.canCommit());
157     }
158
159     @Test(expected = IllegalArgumentException.class)
160     public void testCanCommitWithInvalidResponseType() throws Exception {
161         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
162                 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit("invalid"))), tx);
163
164         propagateExecutionExceptionCause(proxy.canCommit());
165     }
166
167     @Test(expected = TestException.class)
168     public void testCanCommitWithFailedCohortFuture() throws Exception {
169         List<CohortInfo> cohorts = Arrays.asList(
170                 newCohortInfo(new CohortActor.Builder(tx)),
171                 newCohortInfoWithFailedFuture(new TestException()),
172                 newCohortInfo(new CohortActor.Builder(tx)));
173         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
174
175         propagateExecutionExceptionCause(proxy.canCommit());
176     }
177
178     @Test
179     public void testAllThreePhasesSuccessful() throws Exception {
180         List<CohortInfo> cohorts = Arrays.asList(
181                 newCohortInfo(
182                         new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
183                                 .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
184                 newCohortInfo(
185                         new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
186                                 .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))));
187         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
188
189         verifyCanCommit(proxy.canCommit(), true);
190         verifySuccessfulFuture(proxy.preCommit());
191         verifySuccessfulFuture(proxy.commit());
192         verifyCohortActors();
193     }
194
195     @Test(expected = TestException.class)
196     public void testCommitWithExceptionFailure() throws Exception {
197         List<CohortInfo> cohorts = Arrays.asList(
198                 newCohortInfo(
199                         new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
200                                 .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
201                 newCohortInfo(
202                         new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
203                                 .expectCommit(new TestException())));
204         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
205
206         verifyCanCommit(proxy.canCommit(), true);
207         verifySuccessfulFuture(proxy.preCommit());
208         propagateExecutionExceptionCause(proxy.commit());
209     }
210
211     @Test(expected = IllegalArgumentException.class)
212     public void testCommitWithInvalidResponseType() throws Exception {
213         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext,
214                 Arrays.asList(newCohortInfo(new CohortActor.Builder(tx)
215                         .expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).expectCommit("invalid"))), tx);
216
217         verifyCanCommit(proxy.canCommit(), true);
218         verifySuccessfulFuture(proxy.preCommit());
219         propagateExecutionExceptionCause(proxy.commit());
220     }
221
222     @Test
223     public void testAbort() throws Exception {
224         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
225                 newCohortInfo(new CohortActor.Builder(tx).expectAbort(
226                         AbortTransactionReply.instance(CURRENT_VERSION)))), tx);
227
228         verifySuccessfulFuture(proxy.abort());
229         verifyCohortActors();
230     }
231
232     @Test
233     public void testAbortWithFailure() throws Exception {
234         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
235                 newCohortInfo(new CohortActor.Builder(tx).expectAbort(new RuntimeException("mock")))), tx);
236
237         // The exception should not get propagated.
238         verifySuccessfulFuture(proxy.abort());
239         verifyCohortActors();
240     }
241
242     @Test
243     public void testAbortWithFailedCohortFuture() throws Exception {
244         List<CohortInfo> cohorts = Arrays.asList(
245                 newCohortInfoWithFailedFuture(new TestException()), newCohortInfo(new CohortActor.Builder(tx)));
246         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
247
248         verifySuccessfulFuture(proxy.abort());
249         verifyCohortActors();
250     }
251
252     @Test
253     public void testWithNoCohorts() throws Exception {
254         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext,
255                 Collections.<CohortInfo>emptyList(), tx);
256
257         verifyCanCommit(proxy.canCommit(), true);
258         verifySuccessfulFuture(proxy.preCommit());
259         verifySuccessfulFuture(proxy.commit());
260         verifyCohortActors();
261     }
262
263     @SuppressWarnings("checkstyle:avoidHidingCauseException")
264     private void propagateExecutionExceptionCause(final ListenableFuture<?> future) throws Exception {
265         try {
266             future.get(5, TimeUnit.SECONDS);
267             fail("Expected ExecutionException");
268         } catch (ExecutionException e) {
269             verifyCohortActors();
270             Throwables.propagateIfPossible(e.getCause(), Exception.class);
271             throw new RuntimeException(e.getCause());
272         }
273     }
274
275     private CohortInfo newCohortInfo(final CohortActor.Builder builder, final short version) {
276         TestActorRef<CohortActor> actor = actorFactory.createTestActor(builder.props()
277                 .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("cohort"));
278         cohortActors.add(actor);
279         return new CohortInfo(Futures.successful(getSystem().actorSelection(actor.path())), () -> version);
280     }
281
282     private CohortInfo newCohortInfo(final CohortActor.Builder builder) {
283         return newCohortInfo(builder, CURRENT_VERSION);
284     }
285
286     private static CohortInfo newCohortInfoWithFailedFuture(final Exception failure) {
287         return new CohortInfo(Futures.<ActorSelection>failed(failure), () -> CURRENT_VERSION);
288     }
289
290     private void verifyCohortActors() {
291         for (TestActorRef<CohortActor> actor: cohortActors) {
292             actor.underlyingActor().verify();
293         }
294     }
295
296     @SuppressWarnings("checkstyle:IllegalCatch")
297     private <T> T verifySuccessfulFuture(final ListenableFuture<T> future) throws Exception {
298         try {
299             return future.get(5, TimeUnit.SECONDS);
300         } catch (Exception e) {
301             verifyCohortActors();
302             throw e;
303         }
304     }
305
306     private void verifyCanCommit(final ListenableFuture<Boolean> future, final boolean expected) throws Exception {
307         Boolean actual = verifySuccessfulFuture(future);
308         assertEquals("canCommit", expected, actual);
309     }
310
311     private static class CohortActor extends UntypedActor {
312         private final Builder builder;
313         private final AtomicInteger canCommitCount = new AtomicInteger();
314         private final AtomicInteger commitCount = new AtomicInteger();
315         private final AtomicInteger abortCount = new AtomicInteger();
316         private volatile AssertionError assertionError;
317
318         CohortActor(final Builder builder) {
319             this.builder = builder;
320         }
321
322         @Override
323         public void onReceive(final Object message) {
324             if (CanCommitTransaction.isSerializedType(message)) {
325                 canCommitCount.incrementAndGet();
326                 onMessage("CanCommitTransaction", message, CanCommitTransaction.fromSerializable(message),
327                         builder.expCanCommitType, builder.canCommitReply);
328             } else if (CommitTransaction.isSerializedType(message)) {
329                 commitCount.incrementAndGet();
330                 onMessage("CommitTransaction", message, CommitTransaction.fromSerializable(message),
331                         builder.expCommitType, builder.commitReply);
332             } else if (AbortTransaction.isSerializedType(message)) {
333                 abortCount.incrementAndGet();
334                 onMessage("AbortTransaction", message, AbortTransaction.fromSerializable(message),
335                         builder.expAbortType, builder.abortReply);
336             } else {
337                 assertionError = new AssertionError("Unexpected message " + message);
338             }
339         }
340
341         private void onMessage(final String name, final Object rawMessage,
342                 final AbstractThreePhaseCommitMessage actualMessage, final Class<?> expType, final Object reply) {
343             try {
344                 assertNotNull("Unexpected " + name, expType);
345                 assertEquals(name + " type", expType, rawMessage.getClass());
346                 assertEquals(name + " transactionId", builder.transactionId, actualMessage.getTransactionId());
347
348                 if (reply instanceof Throwable) {
349                     getSender().tell(new akka.actor.Status.Failure((Throwable)reply), self());
350                 } else {
351                     getSender().tell(reply, self());
352                 }
353             } catch (AssertionError e) {
354                 assertionError = e;
355             }
356         }
357
358         void verify() {
359             if (assertionError != null) {
360                 throw assertionError;
361             }
362
363             if (builder.expCanCommitType != null) {
364                 assertEquals("CanCommitTransaction count", 1, canCommitCount.get());
365             }
366
367             if (builder.expCommitType != null) {
368                 assertEquals("CommitTransaction count", 1, commitCount.get());
369             }
370
371             if (builder.expAbortType != null) {
372                 assertEquals("AbortTransaction count", 1, abortCount.get());
373             }
374         }
375
376         static class Builder {
377             private Class<?> expCanCommitType;
378             private Class<?> expCommitType;
379             private Class<?> expAbortType;
380             private Object canCommitReply;
381             private Object commitReply;
382             private Object abortReply;
383             private final TransactionIdentifier transactionId;
384
385             Builder(final TransactionIdentifier transactionId) {
386                 this.transactionId = Preconditions.checkNotNull(transactionId);
387             }
388
389             Builder expectCanCommit(final Class<?> newExpCanCommitType, final Object newCanCommitReply) {
390                 this.expCanCommitType = newExpCanCommitType;
391                 this.canCommitReply = newCanCommitReply;
392                 return this;
393             }
394
395             Builder expectCanCommit(final Object newCanCommitReply) {
396                 return expectCanCommit(CanCommitTransaction.class, newCanCommitReply);
397             }
398
399             Builder expectCommit(final Class<?> newExpCommitType, final Object newCommitReply) {
400                 this.expCommitType = newExpCommitType;
401                 this.commitReply = newCommitReply;
402                 return this;
403             }
404
405             Builder expectCommit(final Object newCommitReply) {
406                 return expectCommit(CommitTransaction.class, newCommitReply);
407             }
408
409             Builder expectAbort(final Class<?> newExpAbortType, final Object newAbortReply) {
410                 this.expAbortType = newExpAbortType;
411                 this.abortReply = newAbortReply;
412                 return this;
413             }
414
415             Builder expectAbort(final Object newAbortReply) {
416                 return expectAbort(AbortTransaction.class, newAbortReply);
417             }
418
419             Props props() {
420                 return Props.create(CohortActor.class, this);
421             }
422         }
423     }
424 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.