1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.fail;
5 import static org.mockito.Matchers.any;
6 import static org.mockito.Matchers.isA;
7 import static org.mockito.Mockito.doReturn;
8 import static org.mockito.Mockito.times;
9 import static org.mockito.Mockito.verify;
10 import akka.actor.ActorPath;
11 import akka.actor.ActorSelection;
12 import akka.actor.Props;
13 import akka.dispatch.Futures;
14 import akka.util.Timeout;
15 import com.codahale.metrics.Snapshot;
16 import com.codahale.metrics.Timer;
17 import com.google.common.collect.Lists;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import java.util.List;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.Before;
23 import org.junit.Test;
24 import org.mockito.Mock;
25 import org.mockito.MockitoAnnotations;
26 import org.mockito.stubbing.Stubber;
27 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
31 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
33 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
34 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
35 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
36 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
37 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
38 import scala.concurrent.Future;
39 import scala.concurrent.duration.Duration;
41 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
43 @SuppressWarnings("serial")
44 static class TestException extends RuntimeException {
48 private ActorContext actorContext;
51 private DatastoreContext datastoreContext;
54 private Timer commitTimer;
57 private Timer.Context commitTimerContext;
60 private Snapshot commitSnapshot;
64 MockitoAnnotations.initMocks(this);
66 doReturn(getSystem()).when(actorContext).getActorSystem();
67 doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
68 doReturn(datastoreContext).when(actorContext).getDatastoreContext();
69 doReturn(30).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
70 doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
71 doReturn(commitTimerContext).when(commitTimer).time();
72 doReturn(commitSnapshot).when(commitTimer).getSnapshot();
73 for(int i=1;i<11;i++){
74 // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
75 // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
76 doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
78 doReturn(10.0).when(actorContext).getTxCreationLimit();
81 private Future<ActorSelection> newCohort() {
82 ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path();
83 ActorSelection actorSelection = getSystem().actorSelection(path);
84 return Futures.successful(actorSelection);
87 private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception {
88 List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
89 for(int i = 1; i <= nCohorts; i++) {
90 cohortFutures.add(newCohort());
93 return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
96 private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath()
98 List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
99 cohortFutures.add(newCohort());
100 cohortFutures.add(Futures.<ActorSelection>failed(new TestException()));
102 return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
105 private void setupMockActorContext(Class<?> requestType, Object... responses) {
106 Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures
107 .failed((Throwable) responses[0]) : Futures
108 .successful(((SerializableMessage) responses[0]).toSerializable()));
110 for(int i = 1; i < responses.length; i++) {
111 stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures
112 .failed((Throwable) responses[i]) : Futures
113 .successful(((SerializableMessage) responses[i]).toSerializable()));
116 stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
117 isA(requestType), any(Timeout.class));
119 doReturn(new Timeout(Duration.apply(1000, TimeUnit.MILLISECONDS)))
120 .when(actorContext).getTransactionCommitOperationTimeout();
123 private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
124 verify(actorContext, times(nCohorts)).executeOperationAsync(
125 any(ActorSelection.class), isA(requestType), any(Timeout.class));
128 private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
131 future.get(5, TimeUnit.SECONDS);
132 fail("Expected ExecutionException");
133 } catch(ExecutionException e) {
139 public void testCanCommitWithOneCohort() throws Exception {
141 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
143 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
144 CanCommitTransactionReply.YES);
146 ListenableFuture<Boolean> future = proxy.canCommit();
148 assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
150 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
151 CanCommitTransactionReply.NO);
153 future = proxy.canCommit();
155 assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
157 verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
161 public void testCanCommitWithMultipleCohorts() throws Exception {
163 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
165 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
166 CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
168 ListenableFuture<Boolean> future = proxy.canCommit();
170 assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
172 verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
176 public void testCanCommitWithMultipleCohortsAndOneFailure() throws Exception {
178 ThreePhaseCommitCohortProxy proxy = setupProxy(3);
180 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
181 CanCommitTransactionReply.YES, CanCommitTransactionReply.NO, CanCommitTransactionReply.YES);
183 ListenableFuture<Boolean> future = proxy.canCommit();
185 Boolean actual = future.get(5, TimeUnit.SECONDS);
187 assertEquals("canCommit", false, actual);
189 verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
192 @Test(expected = TestException.class)
193 public void testCanCommitWithExceptionFailure() throws Throwable {
195 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
197 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new TestException());
199 propagateExecutionExceptionCause(proxy.canCommit());
202 @Test(expected = ExecutionException.class)
203 public void testCanCommitWithInvalidResponseType() throws Exception {
205 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
207 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
208 new PreCommitTransactionReply());
210 proxy.canCommit().get(5, TimeUnit.SECONDS);
213 @Test(expected = TestException.class)
214 public void testCanCommitWithFailedCohortPath() throws Throwable {
216 ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
219 propagateExecutionExceptionCause(proxy.canCommit());
221 verifyCohortInvocations(0, CanCommitTransaction.SERIALIZABLE_CLASS);
226 public void testPreCommit() throws Exception {
227 // Precommit is currently a no-op
228 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
230 setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
231 new PreCommitTransactionReply());
233 proxy.preCommit().get(5, TimeUnit.SECONDS);
237 public void testAbort() throws Exception {
238 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
240 setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply());
242 proxy.abort().get(5, TimeUnit.SECONDS);
244 verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
248 public void testAbortWithFailure() throws Exception {
249 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
251 setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
253 // The exception should not get propagated.
254 proxy.abort().get(5, TimeUnit.SECONDS);
256 verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
260 public void testAbortWithFailedCohortPath() throws Throwable {
262 ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
264 // The exception should not get propagated.
265 proxy.abort().get(5, TimeUnit.SECONDS);
267 verifyCohortInvocations(0, AbortTransaction.SERIALIZABLE_CLASS);
271 public void testCommit() throws Exception {
273 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
275 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
276 new CommitTransactionReply());
278 proxy.commit().get(5, TimeUnit.SECONDS);
280 verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
283 @Test(expected = TestException.class)
284 public void testCommitWithFailure() throws Throwable {
286 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
288 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
289 new TestException());
291 propagateExecutionExceptionCause(proxy.commit());
294 @Test(expected = ExecutionException.class)
295 public void testCommitWithInvalidResponseType() throws Exception {
297 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
299 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply());
301 proxy.commit().get(5, TimeUnit.SECONDS);
304 @Test(expected = TestException.class)
305 public void testCommitWithFailedCohortPath() throws Throwable {
307 ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
310 propagateExecutionExceptionCause(proxy.commit());
313 verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
319 public void testAllThreePhasesSuccessful() throws Exception {
321 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
323 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
324 CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
326 setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
327 new PreCommitTransactionReply(), new PreCommitTransactionReply());
329 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
330 new CommitTransactionReply(), new CommitTransactionReply());
332 assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
334 proxy.canCommit().get(5, TimeUnit.SECONDS);
335 proxy.preCommit().get(5, TimeUnit.SECONDS);
336 proxy.commit().get(5, TimeUnit.SECONDS);
338 verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
339 verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
344 public void testDoNotChangeTxCreationLimitWhenCommittingEmptyTxn() throws Exception {
346 ThreePhaseCommitCohortProxy proxy = setupProxy(0);
348 assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
350 proxy.canCommit().get(5, TimeUnit.SECONDS);
351 proxy.preCommit().get(5, TimeUnit.SECONDS);
352 proxy.commit().get(5, TimeUnit.SECONDS);