1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.fail;
5 import static org.mockito.Matchers.any;
6 import static org.mockito.Matchers.anyLong;
7 import static org.mockito.Matchers.isA;
8 import static org.mockito.Mockito.doReturn;
9 import static org.mockito.Mockito.never;
10 import static org.mockito.Mockito.times;
11 import static org.mockito.Mockito.verify;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorSelection;
14 import akka.actor.Props;
15 import akka.dispatch.Futures;
16 import akka.util.Timeout;
17 import com.codahale.metrics.Snapshot;
18 import com.codahale.metrics.Timer;
19 import com.google.common.collect.Lists;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import java.util.List;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.TimeUnit;
24 import org.junit.Before;
25 import org.junit.Test;
26 import org.mockito.Mock;
27 import org.mockito.MockitoAnnotations;
28 import org.mockito.stubbing.Stubber;
29 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
31 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
33 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
34 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
35 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
36 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
37 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
38 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
39 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
40 import scala.concurrent.Future;
41 import scala.concurrent.duration.Duration;
43 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
45 @SuppressWarnings("serial")
46 static class TestException extends RuntimeException {
50 private ActorContext actorContext;
53 private DatastoreContext datastoreContext;
56 private Timer commitTimer;
59 private Timer.Context commitTimerContext;
62 private Snapshot commitSnapshot;
66 MockitoAnnotations.initMocks(this);
68 doReturn(getSystem()).when(actorContext).getActorSystem();
69 doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
70 doReturn(datastoreContext).when(actorContext).getDatastoreContext();
71 doReturn(30).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
72 doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
73 doReturn(commitTimerContext).when(commitTimer).time();
74 doReturn(commitSnapshot).when(commitTimer).getSnapshot();
75 for(int i=1;i<11;i++){
76 // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
77 // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
78 doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
80 doReturn(10.0).when(actorContext).getTxCreationLimit();
83 private Future<ActorSelection> newCohort() {
84 ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path();
85 ActorSelection actorSelection = getSystem().actorSelection(path);
86 return Futures.successful(actorSelection);
89 private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception {
90 List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
91 for(int i = 1; i <= nCohorts; i++) {
92 cohortFutures.add(newCohort());
95 return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
98 private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath()
100 List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
101 cohortFutures.add(newCohort());
102 cohortFutures.add(Futures.<ActorSelection>failed(new TestException()));
104 return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
107 private void setupMockActorContext(Class<?> requestType, Object... responses) {
108 Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures
109 .failed((Throwable) responses[0]) : Futures
110 .successful(((SerializableMessage) responses[0]).toSerializable()));
112 for(int i = 1; i < responses.length; i++) {
113 stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures
114 .failed((Throwable) responses[i]) : Futures
115 .successful(((SerializableMessage) responses[i]).toSerializable()));
118 stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
119 isA(requestType), any(Timeout.class));
121 doReturn(new Timeout(Duration.apply(1000, TimeUnit.MILLISECONDS)))
122 .when(actorContext).getTransactionCommitOperationTimeout();
125 private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
126 verify(actorContext, times(nCohorts)).executeOperationAsync(
127 any(ActorSelection.class), isA(requestType), any(Timeout.class));
130 private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
133 future.get(5, TimeUnit.SECONDS);
134 fail("Expected ExecutionException");
135 } catch(ExecutionException e) {
141 public void testCanCommitWithOneCohort() throws Exception {
143 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
145 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
146 CanCommitTransactionReply.YES);
148 ListenableFuture<Boolean> future = proxy.canCommit();
150 assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
152 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
153 CanCommitTransactionReply.NO);
155 future = proxy.canCommit();
157 assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
159 verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
163 public void testCanCommitWithMultipleCohorts() throws Exception {
165 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
167 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
168 CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
170 ListenableFuture<Boolean> future = proxy.canCommit();
172 assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
174 verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
178 public void testCanCommitWithMultipleCohortsAndOneFailure() throws Exception {
180 ThreePhaseCommitCohortProxy proxy = setupProxy(3);
182 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
183 CanCommitTransactionReply.YES, CanCommitTransactionReply.NO, CanCommitTransactionReply.YES);
185 ListenableFuture<Boolean> future = proxy.canCommit();
187 Boolean actual = future.get(5, TimeUnit.SECONDS);
189 assertEquals("canCommit", false, actual);
191 verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
194 @Test(expected = TestException.class)
195 public void testCanCommitWithExceptionFailure() throws Throwable {
197 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
199 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new TestException());
201 propagateExecutionExceptionCause(proxy.canCommit());
204 @Test(expected = ExecutionException.class)
205 public void testCanCommitWithInvalidResponseType() throws Exception {
207 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
209 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
210 new PreCommitTransactionReply());
212 proxy.canCommit().get(5, TimeUnit.SECONDS);
215 @Test(expected = TestException.class)
216 public void testCanCommitWithFailedCohortPath() throws Throwable {
218 ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
221 propagateExecutionExceptionCause(proxy.canCommit());
223 verifyCohortInvocations(0, CanCommitTransaction.SERIALIZABLE_CLASS);
228 public void testPreCommit() throws Exception {
229 // Precommit is currently a no-op
230 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
232 setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
233 new PreCommitTransactionReply());
235 proxy.preCommit().get(5, TimeUnit.SECONDS);
239 public void testAbort() throws Exception {
240 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
242 setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply());
244 proxy.abort().get(5, TimeUnit.SECONDS);
246 verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
250 public void testAbortWithFailure() throws Exception {
251 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
253 setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
255 // The exception should not get propagated.
256 proxy.abort().get(5, TimeUnit.SECONDS);
258 verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
262 public void testAbortWithFailedCohortPath() throws Throwable {
264 ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
266 // The exception should not get propagated.
267 proxy.abort().get(5, TimeUnit.SECONDS);
269 verifyCohortInvocations(0, AbortTransaction.SERIALIZABLE_CLASS);
273 public void testCommit() throws Exception {
275 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
277 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
278 new CommitTransactionReply());
280 proxy.commit().get(5, TimeUnit.SECONDS);
282 verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
285 @Test(expected = TestException.class)
286 public void testCommitWithFailure() throws Throwable {
288 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
290 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
291 new TestException());
293 propagateExecutionExceptionCause(proxy.commit());
296 @Test(expected = ExecutionException.class)
297 public void testCommitWithInvalidResponseType() throws Exception {
299 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
301 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply());
303 proxy.commit().get(5, TimeUnit.SECONDS);
306 @Test(expected = TestException.class)
307 public void testCommitWithFailedCohortPath() throws Throwable {
309 ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
312 propagateExecutionExceptionCause(proxy.commit());
315 verify(actorContext, never()).setTxCreationLimit(anyLong());
316 verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
322 public void testAllThreePhasesSuccessful() throws Exception {
324 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
326 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
327 CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
329 setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
330 new PreCommitTransactionReply(), new PreCommitTransactionReply());
332 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
333 new CommitTransactionReply(), new CommitTransactionReply());
335 assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
337 proxy.canCommit().get(5, TimeUnit.SECONDS);
338 proxy.preCommit().get(5, TimeUnit.SECONDS);
339 proxy.commit().get(5, TimeUnit.SECONDS);
341 verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
342 verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
347 public void testDoNotChangeTxCreationLimitWhenCommittingEmptyTxn() throws Exception {
349 ThreePhaseCommitCohortProxy proxy = setupProxy(0);
351 assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
353 proxy.canCommit().get(5, TimeUnit.SECONDS);
354 proxy.preCommit().get(5, TimeUnit.SECONDS);
355 proxy.commit().get(5, TimeUnit.SECONDS);
357 verify(actorContext, never()).setTxCreationLimit(anyLong());