1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.fail;
5 import static org.mockito.Matchers.any;
6 import static org.mockito.Matchers.anyLong;
7 import static org.mockito.Matchers.isA;
8 import static org.mockito.Mockito.doReturn;
9 import static org.mockito.Mockito.never;
10 import static org.mockito.Mockito.timeout;
11 import static org.mockito.Mockito.times;
12 import static org.mockito.Mockito.verify;
13 import akka.actor.ActorPath;
14 import akka.actor.ActorSelection;
15 import akka.actor.Props;
16 import akka.dispatch.Futures;
17 import akka.util.Timeout;
18 import com.codahale.metrics.Snapshot;
19 import com.codahale.metrics.Timer;
20 import com.google.common.collect.Lists;
21 import com.google.common.util.concurrent.ListenableFuture;
22 import java.util.List;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.TimeUnit;
25 import org.junit.Before;
26 import org.junit.Test;
27 import org.mockito.Mock;
28 import org.mockito.MockitoAnnotations;
29 import org.mockito.stubbing.Stubber;
30 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
31 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
32 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
33 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
34 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
35 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
36 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
37 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
38 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
39 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
40 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
41 import scala.concurrent.Future;
43 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
45 @SuppressWarnings("serial")
46 static class TestException extends RuntimeException {
50 private ActorContext actorContext;
53 private DatastoreContext datastoreContext;
56 private Timer commitTimer;
59 private Timer.Context commitTimerContext;
62 private Snapshot commitSnapshot;
66 MockitoAnnotations.initMocks(this);
68 doReturn(getSystem()).when(actorContext).getActorSystem();
69 doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
70 doReturn(datastoreContext).when(actorContext).getDatastoreContext();
71 doReturn(100).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
72 doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
73 doReturn(commitTimerContext).when(commitTimer).time();
74 doReturn(commitSnapshot).when(commitTimer).getSnapshot();
75 doReturn(TimeUnit.MILLISECONDS.toNanos(2000) * 1.0).when(commitSnapshot).get95thPercentile();
76 doReturn(10.0).when(actorContext).getTxCreationLimit();
79 private Future<ActorSelection> newCohort() {
80 ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path();
81 ActorSelection actorSelection = getSystem().actorSelection(path);
82 return Futures.successful(actorSelection);
85 private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception {
86 List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
87 for(int i = 1; i <= nCohorts; i++) {
88 cohortFutures.add(newCohort());
91 return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
94 private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath()
96 List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
97 cohortFutures.add(newCohort());
98 cohortFutures.add(Futures.<ActorSelection>failed(new TestException()));
100 return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
103 private void setupMockActorContext(Class<?> requestType, Object... responses) {
104 Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures
105 .failed((Throwable) responses[0]) : Futures
106 .successful(((SerializableMessage) responses[0]).toSerializable()));
108 for(int i = 1; i < responses.length; i++) {
109 stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures
110 .failed((Throwable) responses[i]) : Futures
111 .successful(((SerializableMessage) responses[i]).toSerializable()));
114 stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
115 isA(requestType), any(Timeout.class));
118 private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
119 verify(actorContext, times(nCohorts)).executeOperationAsync(
120 any(ActorSelection.class), isA(requestType), any(Timeout.class));
123 private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
126 future.get(5, TimeUnit.SECONDS);
127 fail("Expected ExecutionException");
128 } catch(ExecutionException e) {
134 public void testCanCommitWithOneCohort() throws Exception {
136 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
138 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
139 CanCommitTransactionReply.YES);
141 ListenableFuture<Boolean> future = proxy.canCommit();
143 assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
145 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
146 CanCommitTransactionReply.NO);
148 future = proxy.canCommit();
150 assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
152 verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
156 public void testCanCommitWithMultipleCohorts() throws Exception {
158 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
160 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
161 CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
163 ListenableFuture<Boolean> future = proxy.canCommit();
165 assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
167 verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
171 public void testCanCommitWithMultipleCohortsAndOneFailure() throws Exception {
173 ThreePhaseCommitCohortProxy proxy = setupProxy(3);
175 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
176 CanCommitTransactionReply.YES, CanCommitTransactionReply.NO, CanCommitTransactionReply.YES);
178 ListenableFuture<Boolean> future = proxy.canCommit();
180 assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
182 verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS);
185 @Test(expected = TestException.class)
186 public void testCanCommitWithExceptionFailure() throws Throwable {
188 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
190 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new TestException());
192 propagateExecutionExceptionCause(proxy.canCommit());
195 @Test(expected = ExecutionException.class)
196 public void testCanCommitWithInvalidResponseType() throws Exception {
198 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
200 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
201 new PreCommitTransactionReply());
203 proxy.canCommit().get(5, TimeUnit.SECONDS);
206 @Test(expected = TestException.class)
207 public void testCanCommitWithFailedCohortPath() throws Throwable {
209 ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
212 propagateExecutionExceptionCause(proxy.canCommit());
214 verifyCohortInvocations(0, CanCommitTransaction.SERIALIZABLE_CLASS);
219 public void testPreCommit() throws Exception {
220 // Precommit is currently a no-op
221 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
223 setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
224 new PreCommitTransactionReply());
226 proxy.preCommit().get(5, TimeUnit.SECONDS);
230 public void testAbort() throws Exception {
231 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
233 setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply());
235 proxy.abort().get(5, TimeUnit.SECONDS);
237 verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
241 public void testAbortWithFailure() throws Exception {
242 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
244 setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
246 // The exception should not get propagated.
247 proxy.abort().get(5, TimeUnit.SECONDS);
249 verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
253 public void testAbortWithFailedCohortPath() throws Throwable {
255 ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
257 // The exception should not get propagated.
258 proxy.abort().get(5, TimeUnit.SECONDS);
260 verifyCohortInvocations(0, AbortTransaction.SERIALIZABLE_CLASS);
264 public void testCommit() throws Exception {
266 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
268 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
269 new CommitTransactionReply());
271 proxy.commit().get(5, TimeUnit.SECONDS);
273 verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
276 @Test(expected = TestException.class)
277 public void testCommitWithFailure() throws Throwable {
279 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
281 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
282 new TestException());
284 propagateExecutionExceptionCause(proxy.commit());
287 @Test(expected = ExecutionException.class)
288 public void testCommitWithInvalidResponseType() throws Exception {
290 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
292 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply());
294 proxy.commit().get(5, TimeUnit.SECONDS);
297 @Test(expected = TestException.class)
298 public void testCommitWithFailedCohortPath() throws Throwable {
300 ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
303 propagateExecutionExceptionCause(proxy.commit());
306 verify(actorContext, never()).setTxCreationLimit(anyLong());
307 verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
313 public void testAllThreePhasesSuccessful() throws Exception {
315 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
317 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
318 CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
320 setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
321 new PreCommitTransactionReply(), new PreCommitTransactionReply());
323 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
324 new CommitTransactionReply(), new CommitTransactionReply());
326 assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
328 proxy.canCommit().get(5, TimeUnit.SECONDS);
329 proxy.preCommit().get(5, TimeUnit.SECONDS);
330 proxy.commit().get(5, TimeUnit.SECONDS);
332 verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
333 verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
335 // Verify that the creation limit was changed to 0.5 (based on setup)
336 verify(actorContext, timeout(5000)).setTxCreationLimit(0.5);
340 public void testDoNotChangeTxCreationLimitWhenCommittingEmptyTxn() throws Exception {
342 ThreePhaseCommitCohortProxy proxy = setupProxy(0);
344 assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
346 proxy.canCommit().get(5, TimeUnit.SECONDS);
347 proxy.preCommit().get(5, TimeUnit.SECONDS);
348 proxy.commit().get(5, TimeUnit.SECONDS);
350 verify(actorContext, never()).setTxCreationLimit(anyLong());