2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.fail;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Matchers.isA;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.times;
17 import static org.mockito.Mockito.verify;
18 import akka.actor.ActorPath;
19 import akka.actor.ActorSelection;
20 import akka.actor.Props;
21 import akka.dispatch.Futures;
22 import akka.util.Timeout;
23 import com.codahale.metrics.Snapshot;
24 import com.codahale.metrics.Timer;
25 import com.google.common.collect.Lists;
26 import com.google.common.util.concurrent.ListenableFuture;
27 import java.util.List;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.TimeUnit;
30 import org.junit.Before;
31 import org.junit.Test;
32 import org.mockito.Mock;
33 import org.mockito.MockitoAnnotations;
34 import org.mockito.stubbing.Stubber;
35 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
36 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
37 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
38 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
39 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
40 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
41 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
42 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
43 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
44 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
45 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
46 import scala.concurrent.Future;
47 import scala.concurrent.duration.Duration;
49 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
51 @SuppressWarnings("serial")
52 static class TestException extends RuntimeException {
56 private ActorContext actorContext;
59 private DatastoreContext datastoreContext;
62 private Timer commitTimer;
65 private Timer.Context commitTimerContext;
68 private Snapshot commitSnapshot;
72 MockitoAnnotations.initMocks(this);
74 doReturn(getSystem()).when(actorContext).getActorSystem();
75 doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
76 doReturn(datastoreContext).when(actorContext).getDatastoreContext();
77 doReturn(30).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
78 doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
79 doReturn(commitTimerContext).when(commitTimer).time();
80 doReturn(commitSnapshot).when(commitTimer).getSnapshot();
81 for(int i=1;i<11;i++){
82 // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
83 // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
84 doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
86 doReturn(10.0).when(actorContext).getTxCreationLimit();
89 private Future<ActorSelection> newCohort() {
90 ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path();
91 ActorSelection actorSelection = getSystem().actorSelection(path);
92 return Futures.successful(actorSelection);
95 private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception {
96 List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
97 for(int i = 1; i <= nCohorts; i++) {
98 cohortFutures.add(newCohort());
101 return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
104 private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath()
106 List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
107 cohortFutures.add(newCohort());
108 cohortFutures.add(Futures.<ActorSelection>failed(new TestException()));
110 return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
113 private void setupMockActorContext(Class<?> requestType, Object... responses) {
114 Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures
115 .failed((Throwable) responses[0]) : Futures
116 .successful(((SerializableMessage) responses[0]).toSerializable()));
118 for(int i = 1; i < responses.length; i++) {
119 stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures
120 .failed((Throwable) responses[i]) : Futures
121 .successful(((SerializableMessage) responses[i]).toSerializable()));
124 stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
125 isA(requestType), any(Timeout.class));
127 doReturn(new Timeout(Duration.apply(1000, TimeUnit.MILLISECONDS)))
128 .when(actorContext).getTransactionCommitOperationTimeout();
131 private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
132 verify(actorContext, times(nCohorts)).executeOperationAsync(
133 any(ActorSelection.class), isA(requestType), any(Timeout.class));
136 private static void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
139 future.get(5, TimeUnit.SECONDS);
140 fail("Expected ExecutionException");
141 } catch(ExecutionException e) {
147 public void testCanCommitWithOneCohort() throws Exception {
149 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
151 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
152 CanCommitTransactionReply.YES);
154 ListenableFuture<Boolean> future = proxy.canCommit();
156 assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
158 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
159 CanCommitTransactionReply.NO);
161 future = proxy.canCommit();
163 assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
165 verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
169 public void testCanCommitWithMultipleCohorts() throws Exception {
171 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
173 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
174 CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
176 ListenableFuture<Boolean> future = proxy.canCommit();
178 assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
180 verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
184 public void testCanCommitWithMultipleCohortsAndOneFailure() throws Exception {
186 ThreePhaseCommitCohortProxy proxy = setupProxy(3);
188 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
189 CanCommitTransactionReply.YES, CanCommitTransactionReply.NO, CanCommitTransactionReply.YES);
191 ListenableFuture<Boolean> future = proxy.canCommit();
193 Boolean actual = future.get(5, TimeUnit.SECONDS);
195 assertEquals("canCommit", false, actual);
197 verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
200 @Test(expected = TestException.class)
201 public void testCanCommitWithExceptionFailure() throws Throwable {
203 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
205 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new TestException());
207 propagateExecutionExceptionCause(proxy.canCommit());
210 @Test(expected = ExecutionException.class)
211 public void testCanCommitWithInvalidResponseType() throws Exception {
213 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
215 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
216 new PreCommitTransactionReply());
218 proxy.canCommit().get(5, TimeUnit.SECONDS);
221 @Test(expected = TestException.class)
222 public void testCanCommitWithFailedCohortPath() throws Throwable {
224 ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
227 propagateExecutionExceptionCause(proxy.canCommit());
229 verifyCohortInvocations(0, CanCommitTransaction.SERIALIZABLE_CLASS);
234 public void testPreCommit() throws Exception {
235 // Precommit is currently a no-op
236 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
238 setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
239 new PreCommitTransactionReply());
241 proxy.preCommit().get(5, TimeUnit.SECONDS);
245 public void testAbort() throws Exception {
246 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
248 setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply());
250 proxy.abort().get(5, TimeUnit.SECONDS);
252 verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
256 public void testAbortWithFailure() throws Exception {
257 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
259 setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
261 // The exception should not get propagated.
262 proxy.abort().get(5, TimeUnit.SECONDS);
264 verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
268 public void testAbortWithFailedCohortPath() throws Throwable {
270 ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
272 // The exception should not get propagated.
273 proxy.abort().get(5, TimeUnit.SECONDS);
275 verifyCohortInvocations(0, AbortTransaction.SERIALIZABLE_CLASS);
279 public void testCommit() throws Exception {
281 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
283 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
284 new CommitTransactionReply());
286 proxy.commit().get(5, TimeUnit.SECONDS);
288 verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
291 @Test(expected = TestException.class)
292 public void testCommitWithFailure() throws Throwable {
294 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
296 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
297 new TestException());
299 propagateExecutionExceptionCause(proxy.commit());
302 @Test(expected = ExecutionException.class)
303 public void testCommitWithInvalidResponseType() throws Exception {
305 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
307 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply());
309 proxy.commit().get(5, TimeUnit.SECONDS);
312 @Test(expected = TestException.class)
313 public void testCommitWithFailedCohortPath() throws Throwable {
315 ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
318 propagateExecutionExceptionCause(proxy.commit());
321 verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
327 public void testAllThreePhasesSuccessful() throws Exception {
329 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
331 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
332 CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
334 setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
335 new PreCommitTransactionReply(), new PreCommitTransactionReply());
337 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
338 new CommitTransactionReply(), new CommitTransactionReply());
340 assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
342 proxy.canCommit().get(5, TimeUnit.SECONDS);
343 proxy.preCommit().get(5, TimeUnit.SECONDS);
344 proxy.commit().get(5, TimeUnit.SECONDS);
346 verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
347 verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
352 public void testDoNotChangeTxCreationLimitWhenCommittingEmptyTxn() throws Exception {
354 ThreePhaseCommitCohortProxy proxy = setupProxy(0);
356 assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
358 proxy.canCommit().get(5, TimeUnit.SECONDS);
359 proxy.preCommit().get(5, TimeUnit.SECONDS);
360 proxy.commit().get(5, TimeUnit.SECONDS);