1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.fail;
5 import static org.mockito.Matchers.any;
6 import static org.mockito.Matchers.anyLong;
7 import static org.mockito.Matchers.isA;
8 import static org.mockito.Mockito.doReturn;
9 import static org.mockito.Mockito.never;
10 import static org.mockito.Mockito.times;
11 import static org.mockito.Mockito.verify;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorSelection;
14 import akka.actor.Props;
15 import akka.dispatch.Futures;
16 import akka.util.Timeout;
17 import com.codahale.metrics.Snapshot;
18 import com.codahale.metrics.Timer;
19 import com.google.common.collect.Lists;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import java.util.List;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.TimeUnit;
24 import org.junit.Before;
25 import org.junit.Test;
26 import org.mockito.Mock;
27 import org.mockito.MockitoAnnotations;
28 import org.mockito.stubbing.Stubber;
29 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
31 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
33 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
34 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
35 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
36 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
37 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
38 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
39 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
40 import scala.concurrent.Future;
42 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
44 @SuppressWarnings("serial")
45 static class TestException extends RuntimeException {
49 private ActorContext actorContext;
52 private DatastoreContext datastoreContext;
55 private Timer commitTimer;
58 private Timer.Context commitTimerContext;
61 private Snapshot commitSnapshot;
65 MockitoAnnotations.initMocks(this);
67 doReturn(getSystem()).when(actorContext).getActorSystem();
68 doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
69 doReturn(datastoreContext).when(actorContext).getDatastoreContext();
70 doReturn(30).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
71 doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
72 doReturn(commitTimerContext).when(commitTimer).time();
73 doReturn(commitSnapshot).when(commitTimer).getSnapshot();
74 for(int i=1;i<11;i++){
75 // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
76 // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
77 doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
79 doReturn(10.0).when(actorContext).getTxCreationLimit();
82 private Future<ActorSelection> newCohort() {
83 ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path();
84 ActorSelection actorSelection = getSystem().actorSelection(path);
85 return Futures.successful(actorSelection);
88 private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception {
89 List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
90 for(int i = 1; i <= nCohorts; i++) {
91 cohortFutures.add(newCohort());
94 return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
97 private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath()
99 List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
100 cohortFutures.add(newCohort());
101 cohortFutures.add(Futures.<ActorSelection>failed(new TestException()));
103 return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
106 private void setupMockActorContext(Class<?> requestType, Object... responses) {
107 Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures
108 .failed((Throwable) responses[0]) : Futures
109 .successful(((SerializableMessage) responses[0]).toSerializable()));
111 for(int i = 1; i < responses.length; i++) {
112 stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures
113 .failed((Throwable) responses[i]) : Futures
114 .successful(((SerializableMessage) responses[i]).toSerializable()));
117 stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
118 isA(requestType), any(Timeout.class));
121 private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
122 verify(actorContext, times(nCohorts)).executeOperationAsync(
123 any(ActorSelection.class), isA(requestType), any(Timeout.class));
126 private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
129 future.get(5, TimeUnit.SECONDS);
130 fail("Expected ExecutionException");
131 } catch(ExecutionException e) {
137 public void testCanCommitWithOneCohort() throws Exception {
139 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
141 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
142 CanCommitTransactionReply.YES);
144 ListenableFuture<Boolean> future = proxy.canCommit();
146 assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
148 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
149 CanCommitTransactionReply.NO);
151 future = proxy.canCommit();
153 assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
155 verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
159 public void testCanCommitWithMultipleCohorts() throws Exception {
161 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
163 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
164 CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
166 ListenableFuture<Boolean> future = proxy.canCommit();
168 assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
170 verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
174 public void testCanCommitWithMultipleCohortsAndOneFailure() throws Exception {
176 ThreePhaseCommitCohortProxy proxy = setupProxy(3);
178 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
179 CanCommitTransactionReply.YES, CanCommitTransactionReply.NO, CanCommitTransactionReply.YES);
181 ListenableFuture<Boolean> future = proxy.canCommit();
183 assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
185 verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS);
188 @Test(expected = TestException.class)
189 public void testCanCommitWithExceptionFailure() throws Throwable {
191 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
193 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new TestException());
195 propagateExecutionExceptionCause(proxy.canCommit());
198 @Test(expected = ExecutionException.class)
199 public void testCanCommitWithInvalidResponseType() throws Exception {
201 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
203 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
204 new PreCommitTransactionReply());
206 proxy.canCommit().get(5, TimeUnit.SECONDS);
209 @Test(expected = TestException.class)
210 public void testCanCommitWithFailedCohortPath() throws Throwable {
212 ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
215 propagateExecutionExceptionCause(proxy.canCommit());
217 verifyCohortInvocations(0, CanCommitTransaction.SERIALIZABLE_CLASS);
222 public void testPreCommit() throws Exception {
223 // Precommit is currently a no-op
224 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
226 setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
227 new PreCommitTransactionReply());
229 proxy.preCommit().get(5, TimeUnit.SECONDS);
233 public void testAbort() throws Exception {
234 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
236 setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply());
238 proxy.abort().get(5, TimeUnit.SECONDS);
240 verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
244 public void testAbortWithFailure() throws Exception {
245 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
247 setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
249 // The exception should not get propagated.
250 proxy.abort().get(5, TimeUnit.SECONDS);
252 verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
256 public void testAbortWithFailedCohortPath() throws Throwable {
258 ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
260 // The exception should not get propagated.
261 proxy.abort().get(5, TimeUnit.SECONDS);
263 verifyCohortInvocations(0, AbortTransaction.SERIALIZABLE_CLASS);
267 public void testCommit() throws Exception {
269 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
271 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
272 new CommitTransactionReply());
274 proxy.commit().get(5, TimeUnit.SECONDS);
276 verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
279 @Test(expected = TestException.class)
280 public void testCommitWithFailure() throws Throwable {
282 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
284 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
285 new TestException());
287 propagateExecutionExceptionCause(proxy.commit());
290 @Test(expected = ExecutionException.class)
291 public void testCommitWithInvalidResponseType() throws Exception {
293 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
295 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply());
297 proxy.commit().get(5, TimeUnit.SECONDS);
300 @Test(expected = TestException.class)
301 public void testCommitWithFailedCohortPath() throws Throwable {
303 ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
306 propagateExecutionExceptionCause(proxy.commit());
309 verify(actorContext, never()).setTxCreationLimit(anyLong());
310 verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
316 public void testAllThreePhasesSuccessful() throws Exception {
318 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
320 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
321 CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
323 setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
324 new PreCommitTransactionReply(), new PreCommitTransactionReply());
326 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
327 new CommitTransactionReply(), new CommitTransactionReply());
329 assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
331 proxy.canCommit().get(5, TimeUnit.SECONDS);
332 proxy.preCommit().get(5, TimeUnit.SECONDS);
333 proxy.commit().get(5, TimeUnit.SECONDS);
335 verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
336 verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
341 public void testDoNotChangeTxCreationLimitWhenCommittingEmptyTxn() throws Exception {
343 ThreePhaseCommitCohortProxy proxy = setupProxy(0);
345 assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
347 proxy.canCommit().get(5, TimeUnit.SECONDS);
348 proxy.preCommit().get(5, TimeUnit.SECONDS);
349 proxy.commit().get(5, TimeUnit.SECONDS);
351 verify(actorContext, never()).setTxCreationLimit(anyLong());