2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.fail;
14 import static org.mockito.Mockito.doReturn;
15 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
16 import akka.actor.ActorSelection;
17 import akka.actor.Props;
18 import akka.actor.UntypedActor;
19 import akka.dispatch.Dispatchers;
20 import akka.dispatch.Futures;
21 import akka.testkit.TestActorRef;
22 import com.codahale.metrics.Snapshot;
23 import com.codahale.metrics.Timer;
24 import com.google.common.base.Supplier;
25 import com.google.common.util.concurrent.ListenableFuture;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.Collections;
29 import java.util.List;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import org.junit.Before;
34 import org.junit.Test;
35 import org.mockito.Mock;
36 import org.mockito.MockitoAnnotations;
37 import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy.CohortInfo;
38 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
39 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
40 import org.opendaylight.controller.cluster.datastore.messages.AbstractThreePhaseCommitMessage;
41 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
42 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
43 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
44 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
45 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
46 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
47 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
48 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
49 import org.opendaylight.controller.cluster.raft.TestActorFactory;
50 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
51 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
53 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
55 @SuppressWarnings("serial")
56 static class TestException extends RuntimeException {
59 private ActorContext actorContext;
62 private Timer commitTimer;
65 private Timer.Context commitTimerContext;
68 private Snapshot commitSnapshot;
70 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
71 private final List<TestActorRef<CohortActor>> cohortActors = new ArrayList<>();
75 MockitoAnnotations.initMocks(this);
77 actorContext = new ActorContext(getSystem(), actorFactory.createActor(Props.create(DoNothingActor.class)),
78 new MockClusterWrapper(), new MockConfiguration(),
79 DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache()) {
81 public Timer getOperationTimer(String operationName) {
86 public double getTxCreationLimit() {
91 doReturn(commitTimerContext).when(commitTimer).time();
92 doReturn(commitSnapshot).when(commitTimer).getSnapshot();
93 for(int i=1;i<11;i++){
94 // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
95 // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
96 doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
101 public void testCanCommitYesWithOneCohort() throws Exception {
102 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
103 newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
104 CanCommitTransactionReply.yes(CURRENT_VERSION)))), "txn-1");
106 verifyCanCommit(proxy.canCommit(), true);
107 verifyCohortActors();
111 public void testCanCommitNoWithOneCohort() throws Exception {
112 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
113 newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
114 CanCommitTransactionReply.no(CURRENT_VERSION)))), "txn-1");
116 verifyCanCommit(proxy.canCommit(), false);
117 verifyCohortActors();
121 public void testCanCommitYesWithTwoCohorts() throws Exception {
122 List<CohortInfo> cohorts = Arrays.asList(
123 newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
124 CanCommitTransactionReply.yes(CURRENT_VERSION))),
125 newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
126 CanCommitTransactionReply.yes(CURRENT_VERSION))));
127 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
129 verifyCanCommit(proxy.canCommit(), true);
130 verifyCohortActors();
134 public void testCanCommitNoWithThreeCohorts() throws Exception {
135 List<CohortInfo> cohorts = Arrays.asList(
136 newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
137 CanCommitTransactionReply.yes(CURRENT_VERSION))),
138 newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
139 CanCommitTransactionReply.no(CURRENT_VERSION))),
140 newCohortInfo(new CohortActor.Builder("txn-1")));
141 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
143 verifyCanCommit(proxy.canCommit(), false);
144 verifyCohortActors();
147 @Test(expected = TestException.class)
148 public void testCanCommitWithExceptionFailure() throws Throwable {
149 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
150 newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(new TestException()))), "txn-1");
152 propagateExecutionExceptionCause(proxy.canCommit());
155 @Test(expected = IllegalArgumentException.class)
156 public void testCanCommitWithInvalidResponseType() throws Throwable {
157 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
158 newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit("invalid"))), "txn-1");
160 propagateExecutionExceptionCause(proxy.canCommit());
163 @Test(expected = TestException.class)
164 public void testCanCommitWithFailedCohortFuture() throws Throwable {
165 List<CohortInfo> cohorts = Arrays.asList(
166 newCohortInfo(new CohortActor.Builder("txn-1")),
167 newCohortInfoWithFailedFuture(new TestException()),
168 newCohortInfo(new CohortActor.Builder("txn-1")));
169 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
171 propagateExecutionExceptionCause(proxy.canCommit());
175 public void testAllThreePhasesSuccessful() throws Exception {
176 List<CohortInfo> cohorts = Arrays.asList(
177 newCohortInfo(new CohortActor.Builder("txn-1").
178 expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
179 expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
180 newCohortInfo(new CohortActor.Builder("txn-1").
181 expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
182 expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))));
183 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
185 verifyCanCommit(proxy.canCommit(), true);
186 verifySuccessfulFuture(proxy.preCommit());
187 verifySuccessfulFuture(proxy.commit());
188 verifyCohortActors();
191 @Test(expected = TestException.class)
192 public void testCommitWithExceptionFailure() throws Throwable {
193 List<CohortInfo> cohorts = Arrays.asList(
194 newCohortInfo(new CohortActor.Builder("txn-1").
195 expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
196 expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
197 newCohortInfo(new CohortActor.Builder("txn-1").
198 expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
199 expectCommit(new TestException())));
200 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
202 verifyCanCommit(proxy.canCommit(), true);
203 verifySuccessfulFuture(proxy.preCommit());
204 propagateExecutionExceptionCause(proxy.commit());
207 @Test(expected = IllegalArgumentException.class)
208 public void testCommitWithInvalidResponseType() throws Throwable {
209 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
210 newCohortInfo(new CohortActor.Builder("txn-1").
211 expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
212 expectCommit("invalid"))), "txn-1");
214 verifyCanCommit(proxy.canCommit(), true);
215 verifySuccessfulFuture(proxy.preCommit());
216 propagateExecutionExceptionCause(proxy.commit());
220 public void testAbort() throws Exception {
221 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
222 newCohortInfo(new CohortActor.Builder("txn-1").expectAbort(
223 AbortTransactionReply.instance(CURRENT_VERSION)))), "txn-1");
225 verifySuccessfulFuture(proxy.abort());
226 verifyCohortActors();
230 public void testAbortWithFailure() throws Exception {
231 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
232 newCohortInfo(new CohortActor.Builder("txn-1").expectAbort(new RuntimeException("mock")))), "txn-1");
234 // The exception should not get propagated.
235 verifySuccessfulFuture(proxy.abort());
236 verifyCohortActors();
240 public void testAbortWithFailedCohortFuture() throws Throwable {
241 List<CohortInfo> cohorts = Arrays.asList(
242 newCohortInfoWithFailedFuture(new TestException()),
243 newCohortInfo(new CohortActor.Builder("txn-1")));
244 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
246 verifySuccessfulFuture(proxy.abort());
247 verifyCohortActors();
251 public void testWithNoCohorts() throws Exception {
252 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext,
253 Collections.<CohortInfo>emptyList(), "txn-1");
255 verifyCanCommit(proxy.canCommit(), true);
256 verifySuccessfulFuture(proxy.preCommit());
257 verifySuccessfulFuture(proxy.commit());
258 verifyCohortActors();
262 public void testBackwardsCompatibilityWithPreBoron() throws Exception {
263 List<CohortInfo> cohorts = Arrays.asList(
264 newCohortInfo(new CohortActor.Builder("txn-1").
265 expectCanCommit(ThreePhaseCommitCohortMessages.CanCommitTransaction.class,
266 CanCommitTransactionReply.yes(DataStoreVersions.LITHIUM_VERSION)).
267 expectCommit(ThreePhaseCommitCohortMessages.CommitTransaction.class,
268 CommitTransactionReply.instance(DataStoreVersions.LITHIUM_VERSION)),
269 DataStoreVersions.LITHIUM_VERSION));
270 ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
272 verifyCanCommit(proxy.canCommit(), true);
273 verifySuccessfulFuture(proxy.preCommit());
274 verifySuccessfulFuture(proxy.commit());
275 verifyCohortActors();
278 private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
281 future.get(5, TimeUnit.SECONDS);
282 fail("Expected ExecutionException");
283 } catch(ExecutionException e) {
284 verifyCohortActors();
289 private CohortInfo newCohortInfo(CohortActor.Builder builder, final short version) {
290 TestActorRef<CohortActor> actor = actorFactory.createTestActor(builder.props().
291 withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("cohort"));
292 cohortActors.add(actor);
293 return new CohortInfo(Futures.successful(getSystem().actorSelection(actor.path())), new Supplier<Short>() {
301 private static CohortInfo newCohortInfoWithFailedFuture(Exception failure) {
302 return new CohortInfo(Futures.<ActorSelection>failed(failure), new Supplier<Short>() {
305 return CURRENT_VERSION;
310 private CohortInfo newCohortInfo(CohortActor.Builder builder) {
311 return newCohortInfo(builder, CURRENT_VERSION);
314 private void verifyCohortActors() {
315 for(TestActorRef<CohortActor> actor: cohortActors) {
316 actor.underlyingActor().verify();
320 private <T> T verifySuccessfulFuture(ListenableFuture<T> future) throws Exception {
322 return future.get(5, TimeUnit.SECONDS);
323 } catch(Exception e) {
324 verifyCohortActors();
329 private void verifyCanCommit(ListenableFuture<Boolean> future, boolean expected) throws Exception {
330 Boolean actual = verifySuccessfulFuture(future);
331 assertEquals("canCommit", expected, actual);
334 private static class CohortActor extends UntypedActor {
335 private final Builder builder;
336 private final AtomicInteger canCommitCount = new AtomicInteger();
337 private final AtomicInteger commitCount = new AtomicInteger();
338 private final AtomicInteger abortCount = new AtomicInteger();
339 private volatile AssertionError assertionError;
341 private CohortActor(Builder builder) {
342 this.builder = builder;
346 public void onReceive(Object message) {
347 if(CanCommitTransaction.isSerializedType(message)) {
348 canCommitCount.incrementAndGet();
349 onMessage("CanCommitTransaction", message, CanCommitTransaction.fromSerializable(message),
350 builder.expCanCommitType, builder.canCommitReply);
351 } else if(CommitTransaction.isSerializedType(message)) {
352 commitCount.incrementAndGet();
353 onMessage("CommitTransaction", message, CommitTransaction.fromSerializable(message),
354 builder.expCommitType, builder.commitReply);
355 } else if(AbortTransaction.isSerializedType(message)) {
356 abortCount.incrementAndGet();
357 onMessage("AbortTransaction", message, AbortTransaction.fromSerializable(message),
358 builder.expAbortType, builder.abortReply);
360 assertionError = new AssertionError("Unexpected message " + message);
364 private void onMessage(String name, Object rawMessage, AbstractThreePhaseCommitMessage actualMessage,
365 Class<?> expType, Object reply) {
367 assertNotNull("Unexpected " + name, expType);
368 assertEquals(name + " type", expType, rawMessage.getClass());
369 assertEquals(name + " transactionId", builder.transactionId, actualMessage.getTransactionID());
371 if(reply instanceof Throwable) {
372 getSender().tell(new akka.actor.Status.Failure((Throwable)reply), self());
374 getSender().tell(reply, self());
376 } catch(AssertionError e) {
382 if(assertionError != null) {
383 throw assertionError;
386 if(builder.expCanCommitType != null) {
387 assertEquals("CanCommitTransaction count", 1, canCommitCount.get());
390 if(builder.expCommitType != null) {
391 assertEquals("CommitTransaction count", 1, commitCount.get());
394 if(builder.expAbortType != null) {
395 assertEquals("AbortTransaction count", 1, abortCount.get());
399 static class Builder {
400 private Class<?> expCanCommitType;
401 private Class<?> expCommitType;
402 private Class<?> expAbortType;
403 private Object canCommitReply;
404 private Object commitReply;
405 private Object abortReply;
406 private final String transactionId;
408 Builder(String transactionId) {
409 this.transactionId = transactionId;
412 Builder expectCanCommit(Class<?> expCanCommitType, Object canCommitReply) {
413 this.expCanCommitType = expCanCommitType;
414 this.canCommitReply = canCommitReply;
418 Builder expectCanCommit(Object canCommitReply) {
419 return expectCanCommit(CanCommitTransaction.class, canCommitReply);
422 Builder expectCommit(Class<?> expCommitType, Object commitReply) {
423 this.expCommitType = expCommitType;
424 this.commitReply = commitReply;
428 Builder expectCommit(Object commitReply) {
429 return expectCommit(CommitTransaction.class, commitReply);
432 Builder expectAbort(Class<?> expAbortType, Object abortReply) {
433 this.expAbortType = expAbortType;
434 this.abortReply = abortReply;
438 Builder expectAbort(Object abortReply) {
439 return expectAbort(AbortTransaction.class, abortReply);
443 return Props.create(CohortActor.class, this);