1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorPath;
4 import akka.actor.ActorSelection;
5 import akka.actor.Props;
6 import akka.dispatch.Futures;
8 import com.google.common.collect.Lists;
9 import com.google.common.util.concurrent.ListenableFuture;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.fail;
13 import static org.mockito.Mockito.doReturn;
14 import static org.mockito.Mockito.any;
15 import static org.mockito.Mockito.isA;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.verify;
18 import static org.mockito.Mockito.times;
20 import org.junit.Before;
21 import org.junit.Test;
22 import org.mockito.Mock;
23 import org.mockito.MockitoAnnotations;
24 import org.mockito.stubbing.Stubber;
25 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
31 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
33 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
34 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
35 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
37 import scala.concurrent.Future;
38 import scala.concurrent.duration.FiniteDuration;
40 import java.util.List;
41 import java.util.concurrent.ExecutionException;
42 import java.util.concurrent.TimeUnit;
44 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
46 @SuppressWarnings("serial")
47 static class TestException extends RuntimeException {
51 private ActorContext actorContext;
55 MockitoAnnotations.initMocks(this);
57 doReturn(getSystem()).when(actorContext).getActorSystem();
60 private Future<ActorPath> newCohortPath() {
61 ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path();
62 doReturn(mock(ActorSelection.class)).when(actorContext).actorSelection(path);
63 return Futures.successful(path);
66 private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception {
67 List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
68 for(int i = 1; i <= nCohorts; i++) {
69 cohortPathFutures.add(newCohortPath());
72 return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures, "txn-1");
75 private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath()
77 List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
78 cohortPathFutures.add(newCohortPath());
79 cohortPathFutures.add(Futures.<ActorPath>failed(new TestException()));
81 return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures, "txn-1");
84 private void setupMockActorContext(Class<?> requestType, Object... responses) {
85 Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures
86 .failed((Throwable) responses[0]) : Futures
87 .successful(((SerializableMessage) responses[0]).toSerializable()));
89 for(int i = 1; i < responses.length; i++) {
90 stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures
91 .failed((Throwable) responses[i]) : Futures
92 .successful(((SerializableMessage) responses[i]).toSerializable()));
95 stubber.when(actorContext).executeRemoteOperationAsync(any(ActorSelection.class),
96 isA(requestType), any(FiniteDuration.class));
99 private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
100 verify(actorContext, times(nCohorts)).executeRemoteOperationAsync(
101 any(ActorSelection.class), isA(requestType), any(FiniteDuration.class));
104 private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
107 future.get(5, TimeUnit.SECONDS);
108 fail("Expected ExecutionException");
109 } catch(ExecutionException e) {
115 public void testCanCommitWithOneCohort() throws Exception {
117 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
119 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
120 new CanCommitTransactionReply(true));
122 ListenableFuture<Boolean> future = proxy.canCommit();
124 assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
126 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
127 new CanCommitTransactionReply(false));
129 future = proxy.canCommit();
131 assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
133 verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
137 public void testCanCommitWithMultipleCohorts() throws Exception {
139 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
141 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
142 new CanCommitTransactionReply(true), new CanCommitTransactionReply(true));
144 ListenableFuture<Boolean> future = proxy.canCommit();
146 assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
148 verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
152 public void testCanCommitWithMultipleCohortsAndOneFailure() throws Exception {
154 ThreePhaseCommitCohortProxy proxy = setupProxy(3);
156 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
157 new CanCommitTransactionReply(true), new CanCommitTransactionReply(false),
158 new CanCommitTransactionReply(true));
160 ListenableFuture<Boolean> future = proxy.canCommit();
162 assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
164 verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS);
167 @Test(expected = TestException.class)
168 public void testCanCommitWithExceptionFailure() throws Throwable {
170 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
172 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new TestException());
174 propagateExecutionExceptionCause(proxy.canCommit());
177 @Test(expected = ExecutionException.class)
178 public void testCanCommitWithInvalidResponseType() throws Exception {
180 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
182 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
183 new PreCommitTransactionReply());
185 proxy.canCommit().get(5, TimeUnit.SECONDS);
188 @Test(expected = TestException.class)
189 public void testCanCommitWithFailedCohortPath() throws Throwable {
191 ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
194 propagateExecutionExceptionCause(proxy.canCommit());
196 verifyCohortInvocations(0, CanCommitTransaction.SERIALIZABLE_CLASS);
201 public void testPreCommit() throws Exception {
202 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
204 setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
205 new PreCommitTransactionReply());
207 proxy.preCommit().get(5, TimeUnit.SECONDS);
209 verifyCohortInvocations(1, PreCommitTransaction.SERIALIZABLE_CLASS);
212 @Test(expected = ExecutionException.class)
213 public void testPreCommitWithFailure() throws Exception {
214 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
216 setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
217 new PreCommitTransactionReply(), new RuntimeException("mock"));
219 proxy.preCommit().get(5, TimeUnit.SECONDS);
223 public void testAbort() throws Exception {
224 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
226 setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply());
228 proxy.abort().get(5, TimeUnit.SECONDS);
230 verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
234 public void testAbortWithFailure() throws Exception {
235 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
237 setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
239 // The exception should not get propagated.
240 proxy.abort().get(5, TimeUnit.SECONDS);
242 verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
246 public void testAbortWithFailedCohortPath() throws Throwable {
248 ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
250 // The exception should not get propagated.
251 proxy.abort().get(5, TimeUnit.SECONDS);
253 verifyCohortInvocations(0, AbortTransaction.SERIALIZABLE_CLASS);
257 public void testCommit() throws Exception {
259 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
261 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
262 new CommitTransactionReply());
264 proxy.commit().get(5, TimeUnit.SECONDS);
266 verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
269 @Test(expected = TestException.class)
270 public void testCommitWithFailure() throws Throwable {
272 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
274 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
275 new TestException());
277 propagateExecutionExceptionCause(proxy.commit());
280 @Test(expected = ExecutionException.class)
281 public void testCommitWithInvalidResponseType() throws Exception {
283 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
285 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply());
287 proxy.commit().get(5, TimeUnit.SECONDS);
290 @Test(expected = TestException.class)
291 public void testCommitWithFailedCohortPath() throws Throwable {
293 ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
296 propagateExecutionExceptionCause(proxy.commit());
298 verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
303 public void testAllThreePhasesSuccessful() throws Exception {
305 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
307 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
308 new CanCommitTransactionReply(true), new CanCommitTransactionReply(true));
310 setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
311 new PreCommitTransactionReply(), new PreCommitTransactionReply());
313 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
314 new CommitTransactionReply(), new CommitTransactionReply());
316 proxy.canCommit().get(5, TimeUnit.SECONDS);
317 proxy.preCommit().get(5, TimeUnit.SECONDS);
318 proxy.commit().get(5, TimeUnit.SECONDS);
320 verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
321 verifyCohortInvocations(2, PreCommitTransaction.SERIALIZABLE_CLASS);
322 verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);