2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static java.util.Objects.requireNonNull;
11 import static org.hamcrest.CoreMatchers.instanceOf;
12 import static org.hamcrest.MatcherAssert.assertThat;
13 import static org.junit.Assert.assertEquals;
14 import static org.junit.Assert.assertNotNull;
15 import static org.junit.Assert.assertThrows;
16 import static org.mockito.Mockito.lenient;
17 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
19 import akka.actor.ActorSelection;
20 import akka.actor.Props;
21 import akka.actor.UntypedAbstractActor;
22 import akka.dispatch.Dispatchers;
23 import akka.dispatch.Futures;
24 import akka.testkit.TestActorRef;
25 import com.codahale.metrics.Snapshot;
26 import com.codahale.metrics.Timer;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import java.util.ArrayList;
29 import java.util.List;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import org.junit.Before;
34 import org.junit.Test;
35 import org.junit.runner.RunWith;
36 import org.mockito.Mock;
37 import org.mockito.junit.MockitoJUnitRunner;
38 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
39 import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy.CohortInfo;
40 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
41 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
42 import org.opendaylight.controller.cluster.datastore.messages.AbstractThreePhaseCommitMessage;
43 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
44 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
45 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
46 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
47 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
48 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
49 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
50 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
51 import org.opendaylight.controller.cluster.raft.TestActorFactory;
52 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
54 @Deprecated(since = "9.0.0", forRemoval = true)
55 @RunWith(MockitoJUnitRunner.StrictStubs.class)
56 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
57 static class TestException extends RuntimeException {
58 private static final long serialVersionUID = 1L;
62 private ActorUtils actorUtils;
65 private Timer commitTimer;
67 private Timer.Context commitTimerContext;
69 private Snapshot commitSnapshot;
71 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
72 private final List<TestActorRef<CohortActor>> cohortActors = new ArrayList<>();
73 private final TransactionIdentifier tx = nextTransactionId();
77 actorUtils = new ActorUtils(getSystem(), actorFactory.createActor(Props.create(DoNothingActor.class)),
78 new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(),
79 new PrimaryShardInfoFutureCache()) {
81 public Timer getOperationTimer(final String operationName) {
86 public double getTxCreationLimit() {
91 lenient().doReturn(commitTimerContext).when(commitTimer).time();
92 lenient().doReturn(commitSnapshot).when(commitTimer).getSnapshot();
93 for (int i = 1; i < 11; i++) {
94 // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
95 // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
96 lenient().doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
101 public void testCanCommitYesWithOneCohort() throws Exception {
102 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(
103 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)))),
106 verifyCanCommit(proxy.canCommit(), true);
107 verifyCohortActors();
111 public void testCanCommitNoWithOneCohort() throws Exception {
112 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(
113 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.no(CURRENT_VERSION)))),
116 verifyCanCommit(proxy.canCommit(), false);
117 verifyCohortActors();
121 public void testCanCommitYesWithTwoCohorts() throws Exception {
122 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(
123 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))),
124 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)))),
127 verifyCanCommit(proxy.canCommit(), true);
128 verifyCohortActors();
132 public void testCanCommitNoWithThreeCohorts() throws Exception {
133 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(
134 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))),
135 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.no(CURRENT_VERSION))),
136 newCohortInfo(new CohortActor.Builder(tx))), tx);
138 verifyCanCommit(proxy.canCommit(), false);
139 verifyCohortActors();
143 public void testCanCommitWithExceptionFailure() {
144 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils,
145 List.of(newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(new TestException()))), tx);
147 propagateExecutionExceptionCause(proxy.canCommit(), TestException.class);
151 public void testCanCommitWithInvalidResponseType() {
152 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils,
153 List.of(newCohortInfo(new CohortActor.Builder(tx).expectCanCommit("invalid"))), tx);
155 assertEquals("Unexpected response type class java.lang.String",
156 propagateExecutionExceptionCause(proxy.canCommit(), IllegalArgumentException.class));
160 public void testCanCommitWithFailedCohortFuture() throws Exception {
161 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(
162 newCohortInfo(new CohortActor.Builder(tx)),
163 newCohortInfoWithFailedFuture(new TestException()),
164 newCohortInfo(new CohortActor.Builder(tx))), tx);
166 propagateExecutionExceptionCause(proxy.canCommit(), TestException.class);
170 public void testAllThreePhasesSuccessful() throws Exception {
171 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(
172 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
173 .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
174 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
175 .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION)))), tx);
177 verifyCanCommit(proxy.canCommit(), true);
178 verifySuccessfulFuture(proxy.preCommit());
179 verifySuccessfulFuture(proxy.commit());
180 verifyCohortActors();
184 public void testCommitWithExceptionFailure() throws Exception {
185 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(
186 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
187 .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
188 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
189 .expectCommit(new TestException()))), tx);
191 verifyCanCommit(proxy.canCommit(), true);
192 verifySuccessfulFuture(proxy.preCommit());
193 propagateExecutionExceptionCause(proxy.commit(), TestException.class);
197 public void testCommitWithInvalidResponseType() throws Exception {
198 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils,List.of(
199 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
200 .expectCommit("invalid"))),
203 verifyCanCommit(proxy.canCommit(), true);
204 verifySuccessfulFuture(proxy.preCommit());
205 assertEquals("Unexpected response type class java.lang.String",
206 propagateExecutionExceptionCause(proxy.commit(), IllegalArgumentException.class));
210 public void testAbort() throws Exception {
211 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils,
212 List.of(newCohortInfo(new CohortActor.Builder(tx).expectAbort(
213 AbortTransactionReply.instance(CURRENT_VERSION)))),
216 verifySuccessfulFuture(proxy.abort());
217 verifyCohortActors();
221 public void testAbortWithFailure() throws Exception {
222 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils,
223 List.of(newCohortInfo(new CohortActor.Builder(tx).expectAbort(new RuntimeException("mock")))), tx);
225 // The exception should not get propagated.
226 verifySuccessfulFuture(proxy.abort());
227 verifyCohortActors();
231 public void testAbortWithFailedCohortFuture() throws Exception {
232 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(
233 newCohortInfoWithFailedFuture(new TestException()), newCohortInfo(new CohortActor.Builder(tx))), tx);
235 verifySuccessfulFuture(proxy.abort());
236 verifyCohortActors();
240 public void testWithNoCohorts() throws Exception {
241 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorUtils, List.of(), tx);
243 verifyCanCommit(proxy.canCommit(), true);
244 verifySuccessfulFuture(proxy.preCommit());
245 verifySuccessfulFuture(proxy.commit());
246 verifyCohortActors();
249 private String propagateExecutionExceptionCause(final ListenableFuture<?> future,
250 final Class<? extends Exception> expected) {
251 final var ex = assertThrows(ExecutionException.class, () -> future.get(5, TimeUnit.SECONDS)).getCause();
252 verifyCohortActors();
253 assertThat(ex, instanceOf(expected));
254 return ex.getMessage();
257 private CohortInfo newCohortInfo(final CohortActor.Builder builder, final short version) {
258 TestActorRef<CohortActor> actor = actorFactory.createTestActor(builder.props()
259 .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("cohort"));
260 cohortActors.add(actor);
261 return new CohortInfo(Futures.successful(getSystem().actorSelection(actor.path())), () -> version);
264 private CohortInfo newCohortInfo(final CohortActor.Builder builder) {
265 return newCohortInfo(builder, CURRENT_VERSION);
268 private static CohortInfo newCohortInfoWithFailedFuture(final Exception failure) {
269 return new CohortInfo(Futures.<ActorSelection>failed(failure), () -> CURRENT_VERSION);
272 private void verifyCohortActors() {
273 for (TestActorRef<CohortActor> actor: cohortActors) {
274 actor.underlyingActor().verify();
278 @SuppressWarnings("checkstyle:IllegalCatch")
279 private <T> T verifySuccessfulFuture(final ListenableFuture<T> future) throws Exception {
281 return future.get(5, TimeUnit.SECONDS);
282 } catch (Exception e) {
283 verifyCohortActors();
288 private void verifyCanCommit(final ListenableFuture<Boolean> future, final boolean expected) throws Exception {
289 Boolean actual = verifySuccessfulFuture(future);
290 assertEquals("canCommit", expected, actual);
293 private static class CohortActor extends UntypedAbstractActor {
294 private final Builder builder;
295 private final AtomicInteger canCommitCount = new AtomicInteger();
296 private final AtomicInteger commitCount = new AtomicInteger();
297 private final AtomicInteger abortCount = new AtomicInteger();
298 private volatile AssertionError assertionError;
300 CohortActor(final Builder builder) {
301 this.builder = builder;
305 public void onReceive(final Object message) {
306 if (CanCommitTransaction.isSerializedType(message)) {
307 canCommitCount.incrementAndGet();
308 onMessage("CanCommitTransaction", message, CanCommitTransaction.fromSerializable(message),
309 builder.expCanCommitType, builder.canCommitReply);
310 } else if (CommitTransaction.isSerializedType(message)) {
311 commitCount.incrementAndGet();
312 onMessage("CommitTransaction", message, CommitTransaction.fromSerializable(message),
313 builder.expCommitType, builder.commitReply);
314 } else if (AbortTransaction.isSerializedType(message)) {
315 abortCount.incrementAndGet();
316 onMessage("AbortTransaction", message, AbortTransaction.fromSerializable(message),
317 builder.expAbortType, builder.abortReply);
319 assertionError = new AssertionError("Unexpected message " + message);
323 private void onMessage(final String name, final Object rawMessage,
324 final AbstractThreePhaseCommitMessage actualMessage, final Class<?> expType, final Object reply) {
326 assertNotNull("Unexpected " + name, expType);
327 assertEquals(name + " type", expType, rawMessage.getClass());
328 assertEquals(name + " transactionId", builder.transactionId, actualMessage.getTransactionId());
330 if (reply instanceof Throwable) {
331 getSender().tell(new akka.actor.Status.Failure((Throwable)reply), self());
333 getSender().tell(reply, self());
335 } catch (AssertionError e) {
341 if (assertionError != null) {
342 throw assertionError;
345 if (builder.expCanCommitType != null) {
346 assertEquals("CanCommitTransaction count", 1, canCommitCount.get());
349 if (builder.expCommitType != null) {
350 assertEquals("CommitTransaction count", 1, commitCount.get());
353 if (builder.expAbortType != null) {
354 assertEquals("AbortTransaction count", 1, abortCount.get());
358 static class Builder {
359 private Class<?> expCanCommitType;
360 private Class<?> expCommitType;
361 private Class<?> expAbortType;
362 private Object canCommitReply;
363 private Object commitReply;
364 private Object abortReply;
365 private final TransactionIdentifier transactionId;
367 Builder(final TransactionIdentifier transactionId) {
368 this.transactionId = requireNonNull(transactionId);
371 Builder expectCanCommit(final Class<?> newExpCanCommitType, final Object newCanCommitReply) {
372 expCanCommitType = newExpCanCommitType;
373 canCommitReply = newCanCommitReply;
377 Builder expectCanCommit(final Object newCanCommitReply) {
378 return expectCanCommit(CanCommitTransaction.class, newCanCommitReply);
381 Builder expectCommit(final Class<?> newExpCommitType, final Object newCommitReply) {
382 expCommitType = newExpCommitType;
383 commitReply = newCommitReply;
387 Builder expectCommit(final Object newCommitReply) {
388 return expectCommit(CommitTransaction.class, newCommitReply);
391 Builder expectAbort(final Class<?> newExpAbortType, final Object newAbortReply) {
392 expAbortType = newExpAbortType;
393 abortReply = newAbortReply;
397 Builder expectAbort(final Object newAbortReply) {
398 return expectAbort(AbortTransaction.class, newAbortReply);
402 return Props.create(CohortActor.class, this);