1 package org.opendaylight.controller.cluster.datastore;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.fail;
5 import static org.mockito.Matchers.any;
6 import static org.mockito.Matchers.isA;
7 import static org.mockito.Mockito.doReturn;
8 import static org.mockito.Mockito.times;
9 import static org.mockito.Mockito.verify;
10 import akka.actor.ActorPath;
11 import akka.actor.ActorSelection;
12 import akka.actor.Props;
13 import akka.dispatch.Futures;
14 import com.google.common.collect.Lists;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import java.util.List;
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.TimeUnit;
19 import org.junit.Before;
20 import org.junit.Test;
21 import org.mockito.Mock;
22 import org.mockito.MockitoAnnotations;
23 import org.mockito.stubbing.Stubber;
24 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
26 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
31 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
32 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
33 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
34 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
35 import scala.concurrent.Future;
37 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
39 @SuppressWarnings("serial")
40 static class TestException extends RuntimeException {
44 private ActorContext actorContext;
48 MockitoAnnotations.initMocks(this);
50 doReturn(getSystem()).when(actorContext).getActorSystem();
53 private Future<ActorSelection> newCohort() {
54 ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path();
55 ActorSelection actorSelection = getSystem().actorSelection(path);
56 return Futures.successful(actorSelection);
59 private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception {
60 List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
61 for(int i = 1; i <= nCohorts; i++) {
62 cohortFutures.add(newCohort());
65 return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
68 private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath()
70 List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
71 cohortFutures.add(newCohort());
72 cohortFutures.add(Futures.<ActorSelection>failed(new TestException()));
74 return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
77 private void setupMockActorContext(Class<?> requestType, Object... responses) {
78 Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures
79 .failed((Throwable) responses[0]) : Futures
80 .successful(((SerializableMessage) responses[0]).toSerializable()));
82 for(int i = 1; i < responses.length; i++) {
83 stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures
84 .failed((Throwable) responses[i]) : Futures
85 .successful(((SerializableMessage) responses[i]).toSerializable()));
88 stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
92 private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
93 verify(actorContext, times(nCohorts)).executeOperationAsync(
94 any(ActorSelection.class), isA(requestType));
97 private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
100 future.get(5, TimeUnit.SECONDS);
101 fail("Expected ExecutionException");
102 } catch(ExecutionException e) {
108 public void testCanCommitWithOneCohort() throws Exception {
110 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
112 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
113 CanCommitTransactionReply.YES);
115 ListenableFuture<Boolean> future = proxy.canCommit();
117 assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
119 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
120 CanCommitTransactionReply.NO);
122 future = proxy.canCommit();
124 assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
126 verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
130 public void testCanCommitWithMultipleCohorts() throws Exception {
132 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
134 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
135 CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
137 ListenableFuture<Boolean> future = proxy.canCommit();
139 assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
141 verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
145 public void testCanCommitWithMultipleCohortsAndOneFailure() throws Exception {
147 ThreePhaseCommitCohortProxy proxy = setupProxy(3);
149 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
150 CanCommitTransactionReply.YES, CanCommitTransactionReply.NO, CanCommitTransactionReply.YES);
152 ListenableFuture<Boolean> future = proxy.canCommit();
154 assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
156 verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS);
159 @Test(expected = TestException.class)
160 public void testCanCommitWithExceptionFailure() throws Throwable {
162 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
164 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new TestException());
166 propagateExecutionExceptionCause(proxy.canCommit());
169 @Test(expected = ExecutionException.class)
170 public void testCanCommitWithInvalidResponseType() throws Exception {
172 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
174 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
175 new PreCommitTransactionReply());
177 proxy.canCommit().get(5, TimeUnit.SECONDS);
180 @Test(expected = TestException.class)
181 public void testCanCommitWithFailedCohortPath() throws Throwable {
183 ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
186 propagateExecutionExceptionCause(proxy.canCommit());
188 verifyCohortInvocations(0, CanCommitTransaction.SERIALIZABLE_CLASS);
193 public void testPreCommit() throws Exception {
194 // Precommit is currently a no-op
195 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
197 setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
198 new PreCommitTransactionReply());
200 proxy.preCommit().get(5, TimeUnit.SECONDS);
204 public void testAbort() throws Exception {
205 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
207 setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply());
209 proxy.abort().get(5, TimeUnit.SECONDS);
211 verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
215 public void testAbortWithFailure() throws Exception {
216 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
218 setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
220 // The exception should not get propagated.
221 proxy.abort().get(5, TimeUnit.SECONDS);
223 verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
227 public void testAbortWithFailedCohortPath() throws Throwable {
229 ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
231 // The exception should not get propagated.
232 proxy.abort().get(5, TimeUnit.SECONDS);
234 verifyCohortInvocations(0, AbortTransaction.SERIALIZABLE_CLASS);
238 public void testCommit() throws Exception {
240 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
242 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
243 new CommitTransactionReply());
245 proxy.commit().get(5, TimeUnit.SECONDS);
247 verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
250 @Test(expected = TestException.class)
251 public void testCommitWithFailure() throws Throwable {
253 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
255 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
256 new TestException());
258 propagateExecutionExceptionCause(proxy.commit());
261 @Test(expected = ExecutionException.class)
262 public void testCommitWithInvalidResponseType() throws Exception {
264 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
266 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply());
268 proxy.commit().get(5, TimeUnit.SECONDS);
271 @Test(expected = TestException.class)
272 public void testCommitWithFailedCohortPath() throws Throwable {
274 ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
277 propagateExecutionExceptionCause(proxy.commit());
279 verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
284 public void testAllThreePhasesSuccessful() throws Exception {
286 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
288 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
289 CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
291 setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
292 new PreCommitTransactionReply(), new PreCommitTransactionReply());
294 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
295 new CommitTransactionReply(), new CommitTransactionReply());
297 proxy.canCommit().get(5, TimeUnit.SECONDS);
298 proxy.preCommit().get(5, TimeUnit.SECONDS);
299 proxy.commit().get(5, TimeUnit.SECONDS);
301 verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
302 verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);