2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.fail;
14 import static org.mockito.Mockito.doReturn;
15 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
17 import akka.actor.ActorSelection;
18 import akka.actor.Props;
19 import akka.actor.UntypedActor;
20 import akka.dispatch.Dispatchers;
21 import akka.dispatch.Futures;
22 import akka.testkit.TestActorRef;
23 import com.codahale.metrics.Snapshot;
24 import com.codahale.metrics.Timer;
25 import com.google.common.base.Preconditions;
26 import com.google.common.base.Throwables;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Collections;
31 import java.util.List;
32 import java.util.concurrent.ExecutionException;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import org.junit.Before;
36 import org.junit.Test;
37 import org.mockito.Mock;
38 import org.mockito.MockitoAnnotations;
39 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
40 import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy.CohortInfo;
41 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
42 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
43 import org.opendaylight.controller.cluster.datastore.messages.AbstractThreePhaseCommitMessage;
44 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
45 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
46 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
47 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
48 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
49 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
50 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
51 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
52 import org.opendaylight.controller.cluster.raft.TestActorFactory;
53 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
55 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
57 @SuppressWarnings("serial")
58 static class TestException extends RuntimeException {
61 private ActorContext actorContext;
64 private Timer commitTimer;
67 private Timer.Context commitTimerContext;
70 private Snapshot commitSnapshot;
72 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
73 private final List<TestActorRef<CohortActor>> cohortActors = new ArrayList<>();
74 private final TransactionIdentifier tx = nextTransactionId();
79 MockitoAnnotations.initMocks(this);
81 actorContext = new ActorContext(getSystem(), actorFactory.createActor(Props.create(DoNothingActor.class)),
82 new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(),
83 new PrimaryShardInfoFutureCache()) {
85 public Timer getOperationTimer(final String operationName) {
90 public double getTxCreationLimit() {
95 doReturn(commitTimerContext).when(commitTimer).time();
96 doReturn(commitSnapshot).when(commitTimer).getSnapshot();
97 for (int i = 1; i < 11; i++) {
98 // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
99 // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
100 doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
105 public void testCanCommitYesWithOneCohort() throws Exception {
106 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
107 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
108 CanCommitTransactionReply.yes(CURRENT_VERSION)))), tx);
110 verifyCanCommit(proxy.canCommit(), true);
111 verifyCohortActors();
115 public void testCanCommitNoWithOneCohort() throws Exception {
116 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
117 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
118 CanCommitTransactionReply.no(CURRENT_VERSION)))), tx);
120 verifyCanCommit(proxy.canCommit(), false);
121 verifyCohortActors();
125 public void testCanCommitYesWithTwoCohorts() throws Exception {
126 List<CohortInfo> cohorts = Arrays.asList(
127 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
128 CanCommitTransactionReply.yes(CURRENT_VERSION))),
129 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
130 CanCommitTransactionReply.yes(CURRENT_VERSION))));
131 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
133 verifyCanCommit(proxy.canCommit(), true);
134 verifyCohortActors();
138 public void testCanCommitNoWithThreeCohorts() throws Exception {
139 List<CohortInfo> cohorts = Arrays.asList(
140 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
141 CanCommitTransactionReply.yes(CURRENT_VERSION))),
142 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(
143 CanCommitTransactionReply.no(CURRENT_VERSION))),
144 newCohortInfo(new CohortActor.Builder(tx)));
145 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
147 verifyCanCommit(proxy.canCommit(), false);
148 verifyCohortActors();
151 @Test(expected = TestException.class)
152 public void testCanCommitWithExceptionFailure() throws Exception {
153 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
154 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit(new TestException()))), tx);
156 propagateExecutionExceptionCause(proxy.canCommit());
159 @Test(expected = IllegalArgumentException.class)
160 public void testCanCommitWithInvalidResponseType() throws Exception {
161 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
162 newCohortInfo(new CohortActor.Builder(tx).expectCanCommit("invalid"))), tx);
164 propagateExecutionExceptionCause(proxy.canCommit());
167 @Test(expected = TestException.class)
168 public void testCanCommitWithFailedCohortFuture() throws Exception {
169 List<CohortInfo> cohorts = Arrays.asList(
170 newCohortInfo(new CohortActor.Builder(tx)),
171 newCohortInfoWithFailedFuture(new TestException()),
172 newCohortInfo(new CohortActor.Builder(tx)));
173 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
175 propagateExecutionExceptionCause(proxy.canCommit());
179 public void testAllThreePhasesSuccessful() throws Exception {
180 List<CohortInfo> cohorts = Arrays.asList(
182 new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
183 .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
185 new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
186 .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))));
187 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
189 verifyCanCommit(proxy.canCommit(), true);
190 verifySuccessfulFuture(proxy.preCommit());
191 verifySuccessfulFuture(proxy.commit());
192 verifyCohortActors();
195 @Test(expected = TestException.class)
196 public void testCommitWithExceptionFailure() throws Exception {
197 List<CohortInfo> cohorts = Arrays.asList(
199 new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
200 .expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
202 new CohortActor.Builder(tx).expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION))
203 .expectCommit(new TestException())));
204 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
206 verifyCanCommit(proxy.canCommit(), true);
207 verifySuccessfulFuture(proxy.preCommit());
208 propagateExecutionExceptionCause(proxy.commit());
211 @Test(expected = IllegalArgumentException.class)
212 public void testCommitWithInvalidResponseType() throws Exception {
213 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext,
214 Arrays.asList(newCohortInfo(new CohortActor.Builder(tx)
215 .expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).expectCommit("invalid"))), tx);
217 verifyCanCommit(proxy.canCommit(), true);
218 verifySuccessfulFuture(proxy.preCommit());
219 propagateExecutionExceptionCause(proxy.commit());
223 public void testAbort() throws Exception {
224 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
225 newCohortInfo(new CohortActor.Builder(tx).expectAbort(
226 AbortTransactionReply.instance(CURRENT_VERSION)))), tx);
228 verifySuccessfulFuture(proxy.abort());
229 verifyCohortActors();
233 public void testAbortWithFailure() throws Exception {
234 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
235 newCohortInfo(new CohortActor.Builder(tx).expectAbort(new RuntimeException("mock")))), tx);
237 // The exception should not get propagated.
238 verifySuccessfulFuture(proxy.abort());
239 verifyCohortActors();
243 public void testAbortWithFailedCohortFuture() throws Exception {
244 List<CohortInfo> cohorts = Arrays.asList(
245 newCohortInfoWithFailedFuture(new TestException()), newCohortInfo(new CohortActor.Builder(tx)));
246 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, tx);
248 verifySuccessfulFuture(proxy.abort());
249 verifyCohortActors();
253 public void testWithNoCohorts() throws Exception {
254 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext,
255 Collections.<CohortInfo>emptyList(), tx);
257 verifyCanCommit(proxy.canCommit(), true);
258 verifySuccessfulFuture(proxy.preCommit());
259 verifySuccessfulFuture(proxy.commit());
260 verifyCohortActors();
263 private void propagateExecutionExceptionCause(final ListenableFuture<?> future) throws Exception {
266 future.get(5, TimeUnit.SECONDS);
267 fail("Expected ExecutionException");
268 } catch (ExecutionException e) {
269 verifyCohortActors();
270 Throwables.propagateIfPossible(e.getCause(), Exception.class);
271 throw new RuntimeException(e.getCause());
275 private CohortInfo newCohortInfo(final CohortActor.Builder builder, final short version) {
276 TestActorRef<CohortActor> actor = actorFactory.createTestActor(builder.props()
277 .withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("cohort"));
278 cohortActors.add(actor);
279 return new CohortInfo(Futures.successful(getSystem().actorSelection(actor.path())), () -> version);
282 private CohortInfo newCohortInfo(final CohortActor.Builder builder) {
283 return newCohortInfo(builder, CURRENT_VERSION);
286 private static CohortInfo newCohortInfoWithFailedFuture(final Exception failure) {
287 return new CohortInfo(Futures.<ActorSelection>failed(failure), () -> CURRENT_VERSION);
290 private void verifyCohortActors() {
291 for (TestActorRef<CohortActor> actor: cohortActors) {
292 actor.underlyingActor().verify();
296 @SuppressWarnings("checkstyle:IllegalCatch")
297 private <T> T verifySuccessfulFuture(final ListenableFuture<T> future) throws Exception {
299 return future.get(5, TimeUnit.SECONDS);
300 } catch (Exception e) {
301 verifyCohortActors();
306 private void verifyCanCommit(final ListenableFuture<Boolean> future, final boolean expected) throws Exception {
307 Boolean actual = verifySuccessfulFuture(future);
308 assertEquals("canCommit", expected, actual);
311 private static class CohortActor extends UntypedActor {
312 private final Builder builder;
313 private final AtomicInteger canCommitCount = new AtomicInteger();
314 private final AtomicInteger commitCount = new AtomicInteger();
315 private final AtomicInteger abortCount = new AtomicInteger();
316 private volatile AssertionError assertionError;
318 private CohortActor(final Builder builder) {
319 this.builder = builder;
323 public void onReceive(final Object message) {
324 if (CanCommitTransaction.isSerializedType(message)) {
325 canCommitCount.incrementAndGet();
326 onMessage("CanCommitTransaction", message, CanCommitTransaction.fromSerializable(message),
327 builder.expCanCommitType, builder.canCommitReply);
328 } else if (CommitTransaction.isSerializedType(message)) {
329 commitCount.incrementAndGet();
330 onMessage("CommitTransaction", message, CommitTransaction.fromSerializable(message),
331 builder.expCommitType, builder.commitReply);
332 } else if (AbortTransaction.isSerializedType(message)) {
333 abortCount.incrementAndGet();
334 onMessage("AbortTransaction", message, AbortTransaction.fromSerializable(message),
335 builder.expAbortType, builder.abortReply);
337 assertionError = new AssertionError("Unexpected message " + message);
341 private void onMessage(final String name, final Object rawMessage,
342 final AbstractThreePhaseCommitMessage actualMessage, final Class<?> expType, final Object reply) {
344 assertNotNull("Unexpected " + name, expType);
345 assertEquals(name + " type", expType, rawMessage.getClass());
346 assertEquals(name + " transactionId", builder.transactionId, actualMessage.getTransactionId());
348 if (reply instanceof Throwable) {
349 getSender().tell(new akka.actor.Status.Failure((Throwable)reply), self());
351 getSender().tell(reply, self());
353 } catch (AssertionError e) {
359 if (assertionError != null) {
360 throw assertionError;
363 if (builder.expCanCommitType != null) {
364 assertEquals("CanCommitTransaction count", 1, canCommitCount.get());
367 if (builder.expCommitType != null) {
368 assertEquals("CommitTransaction count", 1, commitCount.get());
371 if (builder.expAbortType != null) {
372 assertEquals("AbortTransaction count", 1, abortCount.get());
376 static class Builder {
377 private Class<?> expCanCommitType;
378 private Class<?> expCommitType;
379 private Class<?> expAbortType;
380 private Object canCommitReply;
381 private Object commitReply;
382 private Object abortReply;
383 private final TransactionIdentifier transactionId;
385 Builder(final TransactionIdentifier transactionId) {
386 this.transactionId = Preconditions.checkNotNull(transactionId);
389 Builder expectCanCommit(final Class<?> newExpCanCommitType, final Object newCanCommitReply) {
390 this.expCanCommitType = newExpCanCommitType;
391 this.canCommitReply = newCanCommitReply;
395 Builder expectCanCommit(final Object newCanCommitReply) {
396 return expectCanCommit(CanCommitTransaction.class, newCanCommitReply);
399 Builder expectCommit(final Class<?> newExpCommitType, final Object newCommitReply) {
400 this.expCommitType = newExpCommitType;
401 this.commitReply = newCommitReply;
405 Builder expectCommit(final Object newCommitReply) {
406 return expectCommit(CommitTransaction.class, newCommitReply);
409 Builder expectAbort(final Class<?> newExpAbortType, final Object newAbortReply) {
410 this.expAbortType = newExpAbortType;
411 this.abortReply = newAbortReply;
415 Builder expectAbort(final Object newAbortReply) {
416 return expectAbort(AbortTransaction.class, newAbortReply);
420 return Props.create(CohortActor.class, this);