1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorPath;
4 import akka.actor.ActorSelection;
5 import akka.actor.Props;
6 import akka.dispatch.Futures;
7 import com.google.common.collect.Lists;
8 import com.google.common.util.concurrent.ListenableFuture;
9 import org.junit.Before;
10 import org.junit.Test;
11 import org.mockito.Mock;
12 import org.mockito.MockitoAnnotations;
13 import org.mockito.stubbing.Stubber;
14 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
15 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
16 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
17 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
18 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
19 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
20 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
21 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
22 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
23 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
24 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
25 import scala.concurrent.Future;
27 import java.util.List;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.TimeUnit;
31 import static org.junit.Assert.assertEquals;
32 import static org.junit.Assert.fail;
33 import static org.mockito.Mockito.any;
34 import static org.mockito.Mockito.doReturn;
35 import static org.mockito.Mockito.isA;
36 import static org.mockito.Mockito.times;
37 import static org.mockito.Mockito.verify;
39 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
41 @SuppressWarnings("serial")
42 static class TestException extends RuntimeException {
46 private ActorContext actorContext;
50 MockitoAnnotations.initMocks(this);
52 doReturn(getSystem()).when(actorContext).getActorSystem();
55 private Future<ActorSelection> newCohort() {
56 ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path();
57 ActorSelection actorSelection = getSystem().actorSelection(path);
58 return Futures.successful(actorSelection);
61 private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception {
62 List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
63 for(int i = 1; i <= nCohorts; i++) {
64 cohortFutures.add(newCohort());
67 return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
70 private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath()
72 List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
73 cohortFutures.add(newCohort());
74 cohortFutures.add(Futures.<ActorSelection>failed(new TestException()));
76 return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
79 private void setupMockActorContext(Class<?> requestType, Object... responses) {
80 Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures
81 .failed((Throwable) responses[0]) : Futures
82 .successful(((SerializableMessage) responses[0]).toSerializable()));
84 for(int i = 1; i < responses.length; i++) {
85 stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures
86 .failed((Throwable) responses[i]) : Futures
87 .successful(((SerializableMessage) responses[i]).toSerializable()));
90 stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
94 private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
95 verify(actorContext, times(nCohorts)).executeOperationAsync(
96 any(ActorSelection.class), isA(requestType));
99 private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
102 future.get(5, TimeUnit.SECONDS);
103 fail("Expected ExecutionException");
104 } catch(ExecutionException e) {
110 public void testCanCommitWithOneCohort() throws Exception {
112 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
114 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
115 new CanCommitTransactionReply(true));
117 ListenableFuture<Boolean> future = proxy.canCommit();
119 assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
121 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
122 new CanCommitTransactionReply(false));
124 future = proxy.canCommit();
126 assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
128 verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
132 public void testCanCommitWithMultipleCohorts() throws Exception {
134 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
136 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
137 new CanCommitTransactionReply(true), new CanCommitTransactionReply(true));
139 ListenableFuture<Boolean> future = proxy.canCommit();
141 assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
143 verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
147 public void testCanCommitWithMultipleCohortsAndOneFailure() throws Exception {
149 ThreePhaseCommitCohortProxy proxy = setupProxy(3);
151 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
152 new CanCommitTransactionReply(true), new CanCommitTransactionReply(false),
153 new CanCommitTransactionReply(true));
155 ListenableFuture<Boolean> future = proxy.canCommit();
157 assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
159 verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS);
162 @Test(expected = TestException.class)
163 public void testCanCommitWithExceptionFailure() throws Throwable {
165 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
167 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new TestException());
169 propagateExecutionExceptionCause(proxy.canCommit());
172 @Test(expected = ExecutionException.class)
173 public void testCanCommitWithInvalidResponseType() throws Exception {
175 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
177 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
178 new PreCommitTransactionReply());
180 proxy.canCommit().get(5, TimeUnit.SECONDS);
183 @Test(expected = TestException.class)
184 public void testCanCommitWithFailedCohortPath() throws Throwable {
186 ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
189 propagateExecutionExceptionCause(proxy.canCommit());
191 verifyCohortInvocations(0, CanCommitTransaction.SERIALIZABLE_CLASS);
196 public void testPreCommit() throws Exception {
197 // Precommit is currently a no-op
198 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
200 setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
201 new PreCommitTransactionReply());
203 proxy.preCommit().get(5, TimeUnit.SECONDS);
207 public void testAbort() throws Exception {
208 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
210 setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply());
212 proxy.abort().get(5, TimeUnit.SECONDS);
214 verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
218 public void testAbortWithFailure() throws Exception {
219 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
221 setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
223 // The exception should not get propagated.
224 proxy.abort().get(5, TimeUnit.SECONDS);
226 verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
230 public void testAbortWithFailedCohortPath() throws Throwable {
232 ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
234 // The exception should not get propagated.
235 proxy.abort().get(5, TimeUnit.SECONDS);
237 verifyCohortInvocations(0, AbortTransaction.SERIALIZABLE_CLASS);
241 public void testCommit() throws Exception {
243 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
245 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
246 new CommitTransactionReply());
248 proxy.commit().get(5, TimeUnit.SECONDS);
250 verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
253 @Test(expected = TestException.class)
254 public void testCommitWithFailure() throws Throwable {
256 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
258 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
259 new TestException());
261 propagateExecutionExceptionCause(proxy.commit());
264 @Test(expected = ExecutionException.class)
265 public void testCommitWithInvalidResponseType() throws Exception {
267 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
269 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply());
271 proxy.commit().get(5, TimeUnit.SECONDS);
274 @Test(expected = TestException.class)
275 public void testCommitWithFailedCohortPath() throws Throwable {
277 ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
280 propagateExecutionExceptionCause(proxy.commit());
282 verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
287 public void testAllThreePhasesSuccessful() throws Exception {
289 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
291 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
292 new CanCommitTransactionReply(true), new CanCommitTransactionReply(true));
294 setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
295 new PreCommitTransactionReply(), new PreCommitTransactionReply());
297 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
298 new CommitTransactionReply(), new CommitTransactionReply());
300 proxy.canCommit().get(5, TimeUnit.SECONDS);
301 proxy.preCommit().get(5, TimeUnit.SECONDS);
302 proxy.commit().get(5, TimeUnit.SECONDS);
304 verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
305 verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);