1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorPath;
4 import akka.actor.ActorSelection;
5 import akka.actor.Props;
6 import akka.dispatch.Futures;
8 import com.google.common.collect.Lists;
9 import com.google.common.util.concurrent.ListenableFuture;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.fail;
13 import static org.mockito.Mockito.doReturn;
14 import static org.mockito.Mockito.any;
15 import static org.mockito.Mockito.isA;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.verify;
18 import static org.mockito.Mockito.times;
20 import org.junit.Before;
21 import org.junit.Test;
22 import org.mockito.Mock;
23 import org.mockito.MockitoAnnotations;
24 import org.mockito.stubbing.Stubber;
25 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
31 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
33 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
34 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
35 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
37 import scala.concurrent.Future;
39 import java.util.List;
40 import java.util.concurrent.ExecutionException;
41 import java.util.concurrent.TimeUnit;
43 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
45 @SuppressWarnings("serial")
46 static class TestException extends RuntimeException {
50 private ActorContext actorContext;
54 MockitoAnnotations.initMocks(this);
56 doReturn(getSystem()).when(actorContext).getActorSystem();
59 private Future<ActorPath> newCohortPath() {
60 ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path();
61 doReturn(mock(ActorSelection.class)).when(actorContext).actorSelection(path);
62 return Futures.successful(path);
65 private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception {
66 List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
67 for(int i = 1; i <= nCohorts; i++) {
68 cohortPathFutures.add(newCohortPath());
71 return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures, "txn-1");
74 private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath()
76 List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
77 cohortPathFutures.add(newCohortPath());
78 cohortPathFutures.add(Futures.<ActorPath>failed(new TestException()));
80 return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures, "txn-1");
83 private void setupMockActorContext(Class<?> requestType, Object... responses) {
84 Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures
85 .failed((Throwable) responses[0]) : Futures
86 .successful(((SerializableMessage) responses[0]).toSerializable()));
88 for(int i = 1; i < responses.length; i++) {
89 stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures
90 .failed((Throwable) responses[i]) : Futures
91 .successful(((SerializableMessage) responses[i]).toSerializable()));
94 stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
98 private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
99 verify(actorContext, times(nCohorts)).executeOperationAsync(
100 any(ActorSelection.class), isA(requestType));
103 private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
106 future.get(5, TimeUnit.SECONDS);
107 fail("Expected ExecutionException");
108 } catch(ExecutionException e) {
114 public void testCanCommitWithOneCohort() throws Exception {
116 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
118 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
119 new CanCommitTransactionReply(true));
121 ListenableFuture<Boolean> future = proxy.canCommit();
123 assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
125 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
126 new CanCommitTransactionReply(false));
128 future = proxy.canCommit();
130 assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
132 verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
136 public void testCanCommitWithMultipleCohorts() throws Exception {
138 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
140 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
141 new CanCommitTransactionReply(true), new CanCommitTransactionReply(true));
143 ListenableFuture<Boolean> future = proxy.canCommit();
145 assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
147 verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
151 public void testCanCommitWithMultipleCohortsAndOneFailure() throws Exception {
153 ThreePhaseCommitCohortProxy proxy = setupProxy(3);
155 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
156 new CanCommitTransactionReply(true), new CanCommitTransactionReply(false),
157 new CanCommitTransactionReply(true));
159 ListenableFuture<Boolean> future = proxy.canCommit();
161 assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
163 verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS);
166 @Test(expected = TestException.class)
167 public void testCanCommitWithExceptionFailure() throws Throwable {
169 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
171 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new TestException());
173 propagateExecutionExceptionCause(proxy.canCommit());
176 @Test(expected = ExecutionException.class)
177 public void testCanCommitWithInvalidResponseType() throws Exception {
179 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
181 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
182 new PreCommitTransactionReply());
184 proxy.canCommit().get(5, TimeUnit.SECONDS);
187 @Test(expected = TestException.class)
188 public void testCanCommitWithFailedCohortPath() throws Throwable {
190 ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
193 propagateExecutionExceptionCause(proxy.canCommit());
195 verifyCohortInvocations(0, CanCommitTransaction.SERIALIZABLE_CLASS);
200 public void testPreCommit() throws Exception {
201 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
203 setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
204 new PreCommitTransactionReply());
206 proxy.preCommit().get(5, TimeUnit.SECONDS);
208 verifyCohortInvocations(1, PreCommitTransaction.SERIALIZABLE_CLASS);
211 @Test(expected = ExecutionException.class)
212 public void testPreCommitWithFailure() throws Exception {
213 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
215 setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
216 new PreCommitTransactionReply(), new RuntimeException("mock"));
218 proxy.preCommit().get(5, TimeUnit.SECONDS);
222 public void testAbort() throws Exception {
223 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
225 setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply());
227 proxy.abort().get(5, TimeUnit.SECONDS);
229 verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
233 public void testAbortWithFailure() throws Exception {
234 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
236 setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
238 // The exception should not get propagated.
239 proxy.abort().get(5, TimeUnit.SECONDS);
241 verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
245 public void testAbortWithFailedCohortPath() throws Throwable {
247 ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
249 // The exception should not get propagated.
250 proxy.abort().get(5, TimeUnit.SECONDS);
252 verifyCohortInvocations(0, AbortTransaction.SERIALIZABLE_CLASS);
256 public void testCommit() throws Exception {
258 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
260 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
261 new CommitTransactionReply());
263 proxy.commit().get(5, TimeUnit.SECONDS);
265 verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
268 @Test(expected = TestException.class)
269 public void testCommitWithFailure() throws Throwable {
271 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
273 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
274 new TestException());
276 propagateExecutionExceptionCause(proxy.commit());
279 @Test(expected = ExecutionException.class)
280 public void testCommitWithInvalidResponseType() throws Exception {
282 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
284 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply());
286 proxy.commit().get(5, TimeUnit.SECONDS);
289 @Test(expected = TestException.class)
290 public void testCommitWithFailedCohortPath() throws Throwable {
292 ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
295 propagateExecutionExceptionCause(proxy.commit());
297 verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
302 public void testAllThreePhasesSuccessful() throws Exception {
304 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
306 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
307 new CanCommitTransactionReply(true), new CanCommitTransactionReply(true));
309 setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
310 new PreCommitTransactionReply(), new PreCommitTransactionReply());
312 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
313 new CommitTransactionReply(), new CommitTransactionReply());
315 proxy.canCommit().get(5, TimeUnit.SECONDS);
316 proxy.preCommit().get(5, TimeUnit.SECONDS);
317 proxy.commit().get(5, TimeUnit.SECONDS);
319 verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
320 verifyCohortInvocations(2, PreCommitTransaction.SERIALIZABLE_CLASS);
321 verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);