2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static java.util.Objects.requireNonNull;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.fail;
14 import static org.mockito.Mockito.doReturn;
15 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
17 import akka.actor.ActorSelection;
18 import akka.actor.Props;
19 import akka.actor.UntypedAbstractActor;
20 import akka.dispatch.Dispatchers;
21 import akka.dispatch.Futures;
22 import akka.testkit.TestActorRef;
23 import com.codahale.metrics.Snapshot;
24 import com.codahale.metrics.Timer;
25 import com.google.common.base.Throwables;
26 import com.google.common.util.concurrent.ListenableFuture;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.Collections;
30 import java.util.List;
31 import java.util.concurrent.ExecutionException;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import org.junit.Before;
35 import org.junit.Test;
36 import org.mockito.Mock;
37 import org.mockito.MockitoAnnotations;
38 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
39 import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy.CohortInfo;
40 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
41 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
42 import org.opendaylight.controller.cluster.datastore.messages.AbstractThreePhaseCommitMessage;
43 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
44 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
45 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
46 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
47 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
48 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
49 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
50 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
51 import org.opendaylight.controller.cluster.raft.TestActorFactory;
52 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
54 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
56 @SuppressWarnings("serial")
57 static class TestException extends RuntimeException {
60 private ActorUtils actorUtils;
63 private Timer commitTimer;
66 private Timer.Context commitTimerContext;
69 private Snapshot commitSnapshot;
71 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
72 private final List<TestActorRef<CohortActor>> cohortActors = new ArrayList<>();
73 private final TransactionIdentifier tx = nextTransactionId();
78 MockitoAnnotations.initMocks(this);
80 actorUtils = new ActorUtils(getSystem(), actorFactory.createActor(Props.create(DoNothingActor.class)),
81 new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(),
82 new PrimaryShardInfoFutureCache()) {
84 public Timer getOperationTimer(final String operationName) {
89 public double getTxCreationLimit() {
94 doReturn(commitTimerContext).when(commitTimer).time();
95 doReturn(commitSnapshot).when(commitTimer).getSnapshot();
96 for (int i = 1; i < 11; i++) {
97 // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
98 // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
99 doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
104 public void testCanCommitYesWithOneCohort() throws Exception {
105 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, Arrays.asList(
106 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
107 CanCommitTransactionReply.yes(CURRENT_VERSION)))), tx);
109 verifyCanCommit(proxy.canCommit(), true);
110 verifyCohortActors();
114 public void testCanCommitNoWithOneCohort() throws Exception {
115 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, Arrays.asList(
116 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
117 CanCommitTransactionReply.no(CURRENT_VERSION)))), tx);
119 verifyCanCommit(proxy.canCommit(), false);
120 verifyCohortActors();
124 public void testCanCommitYesWithTwoCohorts() throws Exception {
125 List<CohortInfo> cohorts = Arrays.asList(
126 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
127 CanCommitTransactionReply.yes(CURRENT_VERSION))),
128 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
129 CanCommitTransactionReply.yes(CURRENT_VERSION))));
130 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, cohorts, tx);
132 verifyCanCommit(proxy.canCommit(), true);
133 verifyCohortActors();
137 public void testCanCommitNoWithThreeCohorts() throws Exception {
138 List<CohortInfo> cohorts = Arrays.asList(
139 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
140 CanCommitTransactionReply.yes(CURRENT_VERSION))),
141 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
142 CanCommitTransactionReply.no(CURRENT_VERSION))),
143 newCohortInfo(new CohortActor.Builder(tx)));
144 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, cohorts, tx);
146 verifyCanCommit(proxy.canCommit(), false);
147 verifyCohortActors();
150 @Test(expected = TestException.class)
151 public void testCanCommitWithExceptionFailure() throws Exception {
152 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, Arrays.asList(
153 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(new TestException()))), tx);
155 propagateExecutionExceptionCause(proxy.canCommit());
158 @Test(expected = IllegalArgumentException.class)
159 public void testCanCommitWithInvalidResponseType() throws Exception {
160 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, Arrays.asList(
161 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit("invalid"))), tx);
163 propagateExecutionExceptionCause(proxy.canCommit());
166 @Test(expected = TestException.class)
167 public void testCanCommitWithFailedCohortFuture() throws Exception {
168 List<CohortInfo> cohorts = Arrays.asList(
169 newCohortInfo(new CohortActor.Builder(tx)),
170 newCohortInfoWithFailedFuture(new TestException()),
171 newCohortInfo(new CohortActor.Builder(tx)));
172 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, cohorts, tx);
174 propagateExecutionExceptionCause(proxy.canCommit());
178 public void testAllThreePhasesSuccessful() throws Exception {
179 List<CohortInfo> cohorts = Arrays.asList(
181 new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
182 .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
184 new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
185 .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))));
186 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, cohorts, tx);
188 verifyCanCommit(proxy.canCommit(), true);
189 verifySuccessfulFuture(proxy.preCommit());
190 verifySuccessfulFuture(proxy.commit());
191 verifyCohortActors();
194 @Test(expected = TestException.class)
195 public void testCommitWithExceptionFailure() throws Exception {
196 List<CohortInfo> cohorts = Arrays.asList(
198 new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
199 .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
201 new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
202 .expectCommit(new TestException())));
203 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, cohorts, tx);
205 verifyCanCommit(proxy.canCommit(), true);
206 verifySuccessfulFuture(proxy.preCommit());
207 propagateExecutionExceptionCause(proxy.commit());
210 @Test(expected = IllegalArgumentException.class)
211 public void testCommitWithInvalidResponseType() throws Exception {
212 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils,
213 Arrays.asList(newCohortInfo(new CohortActor.Builder(tx)
214 .expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).expectCommit("invalid"))), tx);
216 verifyCanCommit(proxy.canCommit(), true);
217 verifySuccessfulFuture(proxy.preCommit());
218 propagateExecutionExceptionCause(proxy.commit());
222 public void testAbort() throws Exception {
223 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, Arrays.asList(
224 newCohortInfo(new CohortActor.Builder(tx).expectAbort(
225 AbortTransactionReply.instance(CURRENT_VERSION)))), tx);
227 verifySuccessfulFuture(proxy.abort());
228 verifyCohortActors();
232 public void testAbortWithFailure() throws Exception {
233 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, Arrays.asList(
234 newCohortInfo(new CohortActor.Builder(tx).expectAbort(new RuntimeException("mock")))), tx);
236 // The exception should not get propagated.
237 verifySuccessfulFuture(proxy.abort());
238 verifyCohortActors();
242 public void testAbortWithFailedCohortFuture() throws Exception {
243 List<CohortInfo> cohorts = Arrays.asList(
244 newCohortInfoWithFailedFuture(new TestException()), newCohortInfo(new CohortActor.Builder(tx)));
245 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, cohorts, tx);
247 verifySuccessfulFuture(proxy.abort());
248 verifyCohortActors();
252 public void testWithNoCohorts() throws Exception {
253 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils,
254 Collections.<CohortInfo>emptyList(), tx);
256 verifyCanCommit(proxy.canCommit(), true);
257 verifySuccessfulFuture(proxy.preCommit());
258 verifySuccessfulFuture(proxy.commit());
259 verifyCohortActors();
262 @SuppressWarnings("checkstyle:avoidHidingCauseException")
263 private void propagateExecutionExceptionCause(final ListenableFuture<?> future) throws Exception {
265 future.get(5, TimeUnit.SECONDS);
266 fail("Expected ExecutionException");
267 } catch (ExecutionException e) {
268 verifyCohortActors();
269 Throwables.propagateIfPossible(e.getCause(), Exception.class);
270 throw new RuntimeException(e.getCause());
274 private CohortInfo newCohortInfo(final CohortActor.Builder builder, final short version) {
275 TestActorRef<CohortActor> actor = actorFactory.createTestActor(builder.props()
276 .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("cohort"));
277 cohortActors.add(actor);
278 return new CohortInfo(Futures.successful(getSystem().actorSelection(actor.path())), () -> version);
281 private CohortInfo newCohortInfo(final CohortActor.Builder builder) {
282 return newCohortInfo(builder, CURRENT_VERSION);
285 private static CohortInfo newCohortInfoWithFailedFuture(final Exception failure) {
286 return new CohortInfo(Futures.<ActorSelection>failed(failure), () -> CURRENT_VERSION);
289 private void verifyCohortActors() {
290 for (TestActorRef<CohortActor> actor: cohortActors) {
291 actor.underlyingActor().verify();
295 @SuppressWarnings("checkstyle:IllegalCatch")
296 private <T> T verifySuccessfulFuture(final ListenableFuture<T> future) throws Exception {
298 return future.get(5, TimeUnit.SECONDS);
299 } catch (Exception e) {
300 verifyCohortActors();
305 private void verifyCanCommit(final ListenableFuture<Boolean> future, final boolean expected) throws Exception {
306 Boolean actual = verifySuccessfulFuture(future);
307 assertEquals("canCommit", expected, actual);
310 private static class CohortActor extends UntypedAbstractActor {
311 private final Builder builder;
312 private final AtomicInteger canCommitCount = new AtomicInteger();
313 private final AtomicInteger commitCount = new AtomicInteger();
314 private final AtomicInteger abortCount = new AtomicInteger();
315 private volatile AssertionError assertionError;
317 CohortActor(final Builder builder) {
318 this.builder = builder;
322 public void onReceive(final Object message) {
323 if (CanCommitTransaction.isSerializedType(message)) {
324 canCommitCount.incrementAndGet();
325 onMessage("CanCommitTransaction", message, CanCommitTransaction.fromSerializable(message),
326 builder.expCanCommitType, builder.canCommitReply);
327 } else if (CommitTransaction.isSerializedType(message)) {
328 commitCount.incrementAndGet();
329 onMessage("CommitTransaction", message, CommitTransaction.fromSerializable(message),
330 builder.expCommitType, builder.commitReply);
331 } else if (AbortTransaction.isSerializedType(message)) {
332 abortCount.incrementAndGet();
333 onMessage("AbortTransaction", message, AbortTransaction.fromSerializable(message),
334 builder.expAbortType, builder.abortReply);
336 assertionError = new AssertionError("Unexpected message " + message);
340 private void onMessage(final String name, final Object rawMessage,
341 final AbstractThreePhaseCommitMessage actualMessage, final Class<?> expType, final Object reply) {
343 assertNotNull("Unexpected " + name, expType);
344 assertEquals(name + " type", expType, rawMessage.getClass());
345 assertEquals(name + " transactionId", builder.transactionId, actualMessage.getTransactionId());
347 if (reply instanceof Throwable) {
348 getSender().tell(new akka.actor.Status.Failure((Throwable)reply), self());
350 getSender().tell(reply, self());
352 } catch (AssertionError e) {
358 if (assertionError != null) {
359 throw assertionError;
362 if (builder.expCanCommitType != null) {
363 assertEquals("CanCommitTransaction count", 1, canCommitCount.get());
366 if (builder.expCommitType != null) {
367 assertEquals("CommitTransaction count", 1, commitCount.get());
370 if (builder.expAbortType != null) {
371 assertEquals("AbortTransaction count", 1, abortCount.get());
375 static class Builder {
376 private Class<?> expCanCommitType;
377 private Class<?> expCommitType;
378 private Class<?> expAbortType;
379 private Object canCommitReply;
380 private Object commitReply;
381 private Object abortReply;
382 private final TransactionIdentifier transactionId;
384 Builder(final TransactionIdentifier transactionId) {
385 this.transactionId = requireNonNull(transactionId);
388 Builder expectCanCommit(final Class<?> newExpCanCommitType, final Object newCanCommitReply) {
389 this.expCanCommitType = newExpCanCommitType;
390 this.canCommitReply = newCanCommitReply;
394 Builder expectCanCommit(final Object newCanCommitReply) {
395 return expectCanCommit(CanCommitTransaction.class, newCanCommitReply);
398 Builder expectCommit(final Class<?> newExpCommitType, final Object newCommitReply) {
399 this.expCommitType = newExpCommitType;
400 this.commitReply = newCommitReply;
404 Builder expectCommit(final Object newCommitReply) {
405 return expectCommit(CommitTransaction.class, newCommitReply);
408 Builder expectAbort(final Class<?> newExpAbortType, final Object newAbortReply) {
409 this.expAbortType = newExpAbortType;
410 this.abortReply = newAbortReply;
414 Builder expectAbort(final Object newAbortReply) {
415 return expectAbort(AbortTransaction.class, newAbortReply);
419 return Props.create(CohortActor.class, this);