2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.fail;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Matchers.isA;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.times;
17 import static org.mockito.Mockito.verify;
18 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
19 import akka.actor.ActorPath;
20 import akka.actor.ActorSelection;
21 import akka.actor.Props;
22 import akka.dispatch.Futures;
23 import akka.util.Timeout;
24 import com.codahale.metrics.Snapshot;
25 import com.codahale.metrics.Timer;
26 import com.google.common.collect.Lists;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import java.util.List;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.TimeUnit;
31 import org.junit.Before;
32 import org.junit.Test;
33 import org.mockito.Mock;
34 import org.mockito.MockitoAnnotations;
35 import org.mockito.stubbing.Stubber;
36 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
37 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
38 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
39 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
40 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
41 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
42 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
43 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
44 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
45 import scala.concurrent.Future;
46 import scala.concurrent.duration.Duration;
48 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
50 @SuppressWarnings("serial")
51 static class TestException extends RuntimeException {
55 private ActorContext actorContext;
58 private DatastoreContext datastoreContext;
61 private Timer commitTimer;
64 private Timer.Context commitTimerContext;
67 private Snapshot commitSnapshot;
71 MockitoAnnotations.initMocks(this);
73 doReturn(getSystem()).when(actorContext).getActorSystem();
74 doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
75 doReturn(datastoreContext).when(actorContext).getDatastoreContext();
76 doReturn(30).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
77 doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
78 doReturn(commitTimerContext).when(commitTimer).time();
79 doReturn(commitSnapshot).when(commitTimer).getSnapshot();
80 for(int i=1;i<11;i++){
81 // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
82 // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
83 doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
85 doReturn(10.0).when(actorContext).getTxCreationLimit();
88 private Future<ActorSelection> newCohort() {
89 ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path();
90 ActorSelection actorSelection = getSystem().actorSelection(path);
91 return Futures.successful(actorSelection);
94 private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception {
95 List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
96 for(int i = 1; i <= nCohorts; i++) {
97 cohortFutures.add(newCohort());
100 return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
103 private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath()
105 List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
106 cohortFutures.add(newCohort());
107 cohortFutures.add(Futures.<ActorSelection>failed(new TestException()));
109 return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
112 private void setupMockActorContext(Class<?> requestType, Object... responses) {
113 Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures
114 .failed((Throwable) responses[0]) : Futures
115 .successful(((SerializableMessage) responses[0]).toSerializable()));
117 for(int i = 1; i < responses.length; i++) {
118 stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures
119 .failed((Throwable) responses[i]) : Futures
120 .successful(((SerializableMessage) responses[i]).toSerializable()));
123 stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
124 isA(requestType), any(Timeout.class));
126 doReturn(new Timeout(Duration.apply(1000, TimeUnit.MILLISECONDS)))
127 .when(actorContext).getTransactionCommitOperationTimeout();
130 private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
131 verify(actorContext, times(nCohorts)).executeOperationAsync(
132 any(ActorSelection.class), isA(requestType), any(Timeout.class));
135 private static void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
138 future.get(5, TimeUnit.SECONDS);
139 fail("Expected ExecutionException");
140 } catch(ExecutionException e) {
146 public void testCanCommitWithOneCohort() throws Exception {
148 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
150 setupMockActorContext(CanCommitTransaction.class, CanCommitTransactionReply.yes(CURRENT_VERSION));
152 ListenableFuture<Boolean> future = proxy.canCommit();
154 assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
156 setupMockActorContext(CanCommitTransaction.class, CanCommitTransactionReply.no(CURRENT_VERSION));
158 future = proxy.canCommit();
160 assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
162 verifyCohortInvocations(2, CanCommitTransaction.class);
166 public void testCanCommitWithMultipleCohorts() throws Exception {
168 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
170 setupMockActorContext(CanCommitTransaction.class,
171 CanCommitTransactionReply.yes(CURRENT_VERSION), CanCommitTransactionReply.yes(CURRENT_VERSION));
173 ListenableFuture<Boolean> future = proxy.canCommit();
175 assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
177 verifyCohortInvocations(2, CanCommitTransaction.class);
181 public void testCanCommitWithMultipleCohortsAndOneFailure() throws Exception {
183 ThreePhaseCommitCohortProxy proxy = setupProxy(3);
185 setupMockActorContext(CanCommitTransaction.class,
186 CanCommitTransactionReply.yes(CURRENT_VERSION), CanCommitTransactionReply.no(CURRENT_VERSION),
187 CanCommitTransactionReply.yes(CURRENT_VERSION));
189 ListenableFuture<Boolean> future = proxy.canCommit();
191 Boolean actual = future.get(5, TimeUnit.SECONDS);
193 assertEquals("canCommit", false, actual);
195 verifyCohortInvocations(2, CanCommitTransaction.class);
198 @Test(expected = TestException.class)
199 public void testCanCommitWithExceptionFailure() throws Throwable {
201 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
203 setupMockActorContext(CanCommitTransaction.class, new TestException());
205 propagateExecutionExceptionCause(proxy.canCommit());
208 @Test(expected = ExecutionException.class)
209 public void testCanCommitWithInvalidResponseType() throws Exception {
211 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
213 setupMockActorContext(CanCommitTransaction.class,
214 new CommitTransactionReply());
216 proxy.canCommit().get(5, TimeUnit.SECONDS);
219 @Test(expected = TestException.class)
220 public void testCanCommitWithFailedCohortPath() throws Throwable {
222 ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
225 propagateExecutionExceptionCause(proxy.canCommit());
227 verifyCohortInvocations(0, CanCommitTransaction.class);
232 public void testPreCommit() throws Exception {
233 // Precommit is currently a no-op
234 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
236 proxy.preCommit().get(5, TimeUnit.SECONDS);
240 public void testAbort() throws Exception {
241 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
243 setupMockActorContext(AbortTransaction.class, new AbortTransactionReply());
245 proxy.abort().get(5, TimeUnit.SECONDS);
247 verifyCohortInvocations(1, AbortTransaction.class);
251 public void testAbortWithFailure() throws Exception {
252 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
254 setupMockActorContext(AbortTransaction.class, new RuntimeException("mock"));
256 // The exception should not get propagated.
257 proxy.abort().get(5, TimeUnit.SECONDS);
259 verifyCohortInvocations(1, AbortTransaction.class);
263 public void testAbortWithFailedCohortPath() throws Throwable {
265 ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
267 // The exception should not get propagated.
268 proxy.abort().get(5, TimeUnit.SECONDS);
270 verifyCohortInvocations(0, AbortTransaction.class);
274 public void testCommit() throws Exception {
276 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
278 setupMockActorContext(CommitTransaction.class, new CommitTransactionReply(),
279 new CommitTransactionReply());
281 proxy.commit().get(5, TimeUnit.SECONDS);
283 verifyCohortInvocations(2, CommitTransaction.class);
286 @Test(expected = TestException.class)
287 public void testCommitWithFailure() throws Throwable {
289 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
291 setupMockActorContext(CommitTransaction.class, new CommitTransactionReply(),
292 new TestException());
294 propagateExecutionExceptionCause(proxy.commit());
297 @Test(expected = ExecutionException.class)
298 public void testCommitWithInvalidResponseType() throws Exception {
300 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
302 setupMockActorContext(CommitTransaction.class, new AbortTransactionReply());
304 proxy.commit().get(5, TimeUnit.SECONDS);
307 @Test(expected = TestException.class)
308 public void testCommitWithFailedCohortPath() throws Throwable {
310 ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
313 propagateExecutionExceptionCause(proxy.commit());
316 verifyCohortInvocations(0, CommitTransaction.class);
322 public void testAllThreePhasesSuccessful() throws Exception {
324 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
326 setupMockActorContext(CanCommitTransaction.class,
327 CanCommitTransactionReply.yes(CURRENT_VERSION), CanCommitTransactionReply.yes(CURRENT_VERSION));
329 setupMockActorContext(CommitTransaction.class,
330 new CommitTransactionReply(), new CommitTransactionReply());
332 assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
334 proxy.canCommit().get(5, TimeUnit.SECONDS);
335 proxy.preCommit().get(5, TimeUnit.SECONDS);
336 proxy.commit().get(5, TimeUnit.SECONDS);
338 verifyCohortInvocations(2, CanCommitTransaction.class);
339 verifyCohortInvocations(2, CommitTransaction.class);
344 public void testDoNotChangeTxCreationLimitWhenCommittingEmptyTxn() throws Exception {
346 ThreePhaseCommitCohortProxy proxy = setupProxy(0);
348 assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
350 proxy.canCommit().get(5, TimeUnit.SECONDS);
351 proxy.preCommit().get(5, TimeUnit.SECONDS);
352 proxy.commit().get(5, TimeUnit.SECONDS);