2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.fail;
14 import static org.mockito.Mockito.doReturn;
15 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
16 import akka.actor.ActorSelection;
17 import akka.actor.Props;
18 import akka.actor.UntypedActor;
19 import akka.dispatch.Dispatchers;
20 import akka.dispatch.Futures;
21 import akka.testkit.TestActorRef;
22 import com.codahale.metrics.Snapshot;
23 import com.codahale.metrics.Timer;
24 import com.google.common.base.Supplier;
25 import com.google.common.util.concurrent.ListenableFuture;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.Collections;
29 import java.util.List;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import org.junit.Before;
34 import org.junit.Test;
35 import org.mockito.Mock;
36 import org.mockito.MockitoAnnotations;
37 import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy.CohortInfo;
38 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
39 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
40 import org.opendaylight.controller.cluster.datastore.messages.AbstractThreePhaseCommitMessage;
41 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
42 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
43 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
44 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
45 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
46 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
47 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
48 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
49 import org.opendaylight.controller.cluster.raft.TestActorFactory;
50 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
52 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
54 @SuppressWarnings("serial")
55 static class TestException extends RuntimeException {
58 private ActorContext actorContext;
61 private Timer commitTimer;
64 private Timer.Context commitTimerContext;
67 private Snapshot commitSnapshot;
69 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
70 private final List<TestActorRef<CohortActor>> cohortActors = new ArrayList<>();
74 MockitoAnnotations.initMocks(this);
76 actorContext = new ActorContext(getSystem(), actorFactory.createActor(Props.create(DoNothingActor.class)),
77 new MockClusterWrapper(), new MockConfiguration(),
78 DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache()) {
80 public Timer getOperationTimer(String operationName) {
85 public double getTxCreationLimit() {
90 doReturn(commitTimerContext).when(commitTimer).time();
91 doReturn(commitSnapshot).when(commitTimer).getSnapshot();
92 for(int i=1;i<11;i++){
93 // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
94 // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
95 doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
100 public void testCanCommitYesWithOneCohort() throws Exception {
101 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
102 newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
103 CanCommitTransactionReply.yes(CURRENT_VERSION)))), "txn-1");
105 verifyCanCommit(proxy.canCommit(), true);
106 verifyCohortActors();
110 public void testCanCommitNoWithOneCohort() throws Exception {
111 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
112 newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
113 CanCommitTransactionReply.no(CURRENT_VERSION)))), "txn-1");
115 verifyCanCommit(proxy.canCommit(), false);
116 verifyCohortActors();
120 public void testCanCommitYesWithTwoCohorts() throws Exception {
121 List<CohortInfo> cohorts = Arrays.asList(
122 newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
123 CanCommitTransactionReply.yes(CURRENT_VERSION))),
124 newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
125 CanCommitTransactionReply.yes(CURRENT_VERSION))));
126 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
128 verifyCanCommit(proxy.canCommit(), true);
129 verifyCohortActors();
133 public void testCanCommitNoWithThreeCohorts() throws Exception {
134 List<CohortInfo> cohorts = Arrays.asList(
135 newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
136 CanCommitTransactionReply.yes(CURRENT_VERSION))),
137 newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
138 CanCommitTransactionReply.no(CURRENT_VERSION))),
139 newCohortInfo(new CohortActor.Builder("txn-1")));
140 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
142 verifyCanCommit(proxy.canCommit(), false);
143 verifyCohortActors();
146 @Test(expected = TestException.class)
147 public void testCanCommitWithExceptionFailure() throws Throwable {
148 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
149 newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(new TestException()))), "txn-1");
151 propagateExecutionExceptionCause(proxy.canCommit());
154 @Test(expected = IllegalArgumentException.class)
155 public void testCanCommitWithInvalidResponseType() throws Throwable {
156 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
157 newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit("invalid"))), "txn-1");
159 propagateExecutionExceptionCause(proxy.canCommit());
162 @Test(expected = TestException.class)
163 public void testCanCommitWithFailedCohortFuture() throws Throwable {
164 List<CohortInfo> cohorts = Arrays.asList(
165 newCohortInfo(new CohortActor.Builder("txn-1")),
166 newCohortInfoWithFailedFuture(new TestException()),
167 newCohortInfo(new CohortActor.Builder("txn-1")));
168 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
170 propagateExecutionExceptionCause(proxy.canCommit());
174 public void testAllThreePhasesSuccessful() throws Exception {
175 List<CohortInfo> cohorts = Arrays.asList(
176 newCohortInfo(new CohortActor.Builder("txn-1").
177 expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
178 expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
179 newCohortInfo(new CohortActor.Builder("txn-1").
180 expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
181 expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))));
182 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
184 verifyCanCommit(proxy.canCommit(), true);
185 verifySuccessfulFuture(proxy.preCommit());
186 verifySuccessfulFuture(proxy.commit());
187 verifyCohortActors();
190 @Test(expected = TestException.class)
191 public void testCommitWithExceptionFailure() throws Throwable {
192 List<CohortInfo> cohorts = Arrays.asList(
193 newCohortInfo(new CohortActor.Builder("txn-1").
194 expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
195 expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
196 newCohortInfo(new CohortActor.Builder("txn-1").
197 expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
198 expectCommit(new TestException())));
199 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
201 verifyCanCommit(proxy.canCommit(), true);
202 verifySuccessfulFuture(proxy.preCommit());
203 propagateExecutionExceptionCause(proxy.commit());
206 @Test(expected = IllegalArgumentException.class)
207 public void testCommitWithInvalidResponseType() throws Throwable {
208 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
209 newCohortInfo(new CohortActor.Builder("txn-1").
210 expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
211 expectCommit("invalid"))), "txn-1");
213 verifyCanCommit(proxy.canCommit(), true);
214 verifySuccessfulFuture(proxy.preCommit());
215 propagateExecutionExceptionCause(proxy.commit());
219 public void testAbort() throws Exception {
220 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
221 newCohortInfo(new CohortActor.Builder("txn-1").expectAbort(
222 AbortTransactionReply.instance(CURRENT_VERSION)))), "txn-1");
224 verifySuccessfulFuture(proxy.abort());
225 verifyCohortActors();
229 public void testAbortWithFailure() throws Exception {
230 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
231 newCohortInfo(new CohortActor.Builder("txn-1").expectAbort(new RuntimeException("mock")))), "txn-1");
233 // The exception should not get propagated.
234 verifySuccessfulFuture(proxy.abort());
235 verifyCohortActors();
239 public void testAbortWithFailedCohortFuture() throws Throwable {
240 List<CohortInfo> cohorts = Arrays.asList(
241 newCohortInfoWithFailedFuture(new TestException()),
242 newCohortInfo(new CohortActor.Builder("txn-1")));
243 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
245 verifySuccessfulFuture(proxy.abort());
246 verifyCohortActors();
250 public void testWithNoCohorts() throws Exception {
251 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext,
252 Collections.<CohortInfo>emptyList(), "txn-1");
254 verifyCanCommit(proxy.canCommit(), true);
255 verifySuccessfulFuture(proxy.preCommit());
256 verifySuccessfulFuture(proxy.commit());
257 verifyCohortActors();
260 private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
263 future.get(5, TimeUnit.SECONDS);
264 fail("Expected ExecutionException");
265 } catch(ExecutionException e) {
266 verifyCohortActors();
271 private CohortInfo newCohortInfo(CohortActor.Builder builder, final short version) {
272 TestActorRef<CohortActor> actor = actorFactory.createTestActor(builder.props().
273 withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("cohort"));
274 cohortActors.add(actor);
275 return new CohortInfo(Futures.successful(getSystem().actorSelection(actor.path())), new Supplier<Short>() {
283 private static CohortInfo newCohortInfoWithFailedFuture(Exception failure) {
284 return new CohortInfo(Futures.<ActorSelection>failed(failure), new Supplier<Short>() {
287 return CURRENT_VERSION;
292 private CohortInfo newCohortInfo(CohortActor.Builder builder) {
293 return newCohortInfo(builder, CURRENT_VERSION);
296 private void verifyCohortActors() {
297 for(TestActorRef<CohortActor> actor: cohortActors) {
298 actor.underlyingActor().verify();
302 private <T> T verifySuccessfulFuture(ListenableFuture<T> future) throws Exception {
304 return future.get(5, TimeUnit.SECONDS);
305 } catch(Exception e) {
306 verifyCohortActors();
311 private void verifyCanCommit(ListenableFuture<Boolean> future, boolean expected) throws Exception {
312 Boolean actual = verifySuccessfulFuture(future);
313 assertEquals("canCommit", expected, actual);
316 private static class CohortActor extends UntypedActor {
317 private final Builder builder;
318 private final AtomicInteger canCommitCount = new AtomicInteger();
319 private final AtomicInteger commitCount = new AtomicInteger();
320 private final AtomicInteger abortCount = new AtomicInteger();
321 private volatile AssertionError assertionError;
323 private CohortActor(Builder builder) {
324 this.builder = builder;
328 public void onReceive(Object message) {
329 if(CanCommitTransaction.isSerializedType(message)) {
330 canCommitCount.incrementAndGet();
331 onMessage("CanCommitTransaction", message, CanCommitTransaction.fromSerializable(message),
332 builder.expCanCommitType, builder.canCommitReply);
333 } else if(CommitTransaction.isSerializedType(message)) {
334 commitCount.incrementAndGet();
335 onMessage("CommitTransaction", message, CommitTransaction.fromSerializable(message),
336 builder.expCommitType, builder.commitReply);
337 } else if(AbortTransaction.isSerializedType(message)) {
338 abortCount.incrementAndGet();
339 onMessage("AbortTransaction", message, AbortTransaction.fromSerializable(message),
340 builder.expAbortType, builder.abortReply);
342 assertionError = new AssertionError("Unexpected message " + message);
346 private void onMessage(String name, Object rawMessage, AbstractThreePhaseCommitMessage actualMessage,
347 Class<?> expType, Object reply) {
349 assertNotNull("Unexpected " + name, expType);
350 assertEquals(name + " type", expType, rawMessage.getClass());
351 assertEquals(name + " transactionId", builder.transactionId, actualMessage.getTransactionID());
353 if(reply instanceof Throwable) {
354 getSender().tell(new akka.actor.Status.Failure((Throwable)reply), self());
356 getSender().tell(reply, self());
358 } catch(AssertionError e) {
364 if(assertionError != null) {
365 throw assertionError;
368 if(builder.expCanCommitType != null) {
369 assertEquals("CanCommitTransaction count", 1, canCommitCount.get());
372 if(builder.expCommitType != null) {
373 assertEquals("CommitTransaction count", 1, commitCount.get());
376 if(builder.expAbortType != null) {
377 assertEquals("AbortTransaction count", 1, abortCount.get());
381 static class Builder {
382 private Class<?> expCanCommitType;
383 private Class<?> expCommitType;
384 private Class<?> expAbortType;
385 private Object canCommitReply;
386 private Object commitReply;
387 private Object abortReply;
388 private final String transactionId;
390 Builder(String transactionId) {
391 this.transactionId = transactionId;
394 Builder expectCanCommit(Class<?> expCanCommitType, Object canCommitReply) {
395 this.expCanCommitType = expCanCommitType;
396 this.canCommitReply = canCommitReply;
400 Builder expectCanCommit(Object canCommitReply) {
401 return expectCanCommit(CanCommitTransaction.class, canCommitReply);
404 Builder expectCommit(Class<?> expCommitType, Object commitReply) {
405 this.expCommitType = expCommitType;
406 this.commitReply = commitReply;
410 Builder expectCommit(Object commitReply) {
411 return expectCommit(CommitTransaction.class, commitReply);
414 Builder expectAbort(Class<?> expAbortType, Object abortReply) {
415 this.expAbortType = expAbortType;
416 this.abortReply = abortReply;
420 Builder expectAbort(Object abortReply) {
421 return expectAbort(AbortTransaction.class, abortReply);
425 return Props.create(CohortActor.class, this);