1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorPath;
4 import akka.actor.ActorSelection;
5 import akka.actor.Props;
6 import akka.dispatch.Futures;
8 import com.google.common.collect.Lists;
9 import com.google.common.util.concurrent.ListenableFuture;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.mockito.Mockito.doReturn;
13 import static org.mockito.Mockito.any;
14 import static org.mockito.Mockito.isA;
15 import static org.mockito.Mockito.mock;
16 import static org.mockito.Mockito.verify;
17 import static org.mockito.Mockito.times;
19 import org.junit.Before;
20 import org.junit.Test;
21 import org.mockito.Mock;
22 import org.mockito.MockitoAnnotations;
23 import org.mockito.stubbing.Stubber;
24 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
26 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
31 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
32 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
33 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
34 import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
35 import scala.concurrent.duration.FiniteDuration;
37 import java.util.List;
38 import java.util.concurrent.ExecutionException;
40 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
43 private ActorContext actorContext;
47 MockitoAnnotations.initMocks(this);
49 doReturn(getSystem()).when(actorContext).getActorSystem();
52 private ThreePhaseCommitCohortProxy setupProxy(int nCohorts) {
53 List<ActorPath> cohorts = Lists.newArrayList();
54 for(int i = 1; i <= nCohorts; i++) {
55 ActorPath path = getSystem().actorOf(Props.create(MessageCollectorActor.class)).path();
57 doReturn(mock(ActorSelection.class)).when(actorContext).actorSelection(path);
60 return new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
63 private void setupMockActorContext(Class<?> requestType, Object... responses) {
64 Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures
65 .failed((Throwable) responses[0]) : Futures
66 .successful(((SerializableMessage) responses[0]).toSerializable()));
68 for(int i = 1; i < responses.length; i++) {
69 stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures
70 .failed((Throwable) responses[i]) : Futures
71 .successful(((SerializableMessage) responses[i]).toSerializable()));
74 stubber.when(actorContext).executeRemoteOperationAsync(any(ActorSelection.class),
75 isA(requestType), any(FiniteDuration.class));
78 private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
79 verify(actorContext, times(nCohorts)).executeRemoteOperationAsync(
80 any(ActorSelection.class), isA(requestType), any(FiniteDuration.class));
84 public void testCanCommitWithOneCohort() throws Exception {
86 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
88 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
89 new CanCommitTransactionReply(true));
91 ListenableFuture<Boolean> future = proxy.canCommit();
93 assertEquals("canCommit", true, future.get());
95 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
96 new CanCommitTransactionReply(false));
98 future = proxy.canCommit();
100 assertEquals("canCommit", false, future.get());
102 verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
106 public void testCanCommitWithMultipleCohorts() throws Exception {
108 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
110 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
111 new CanCommitTransactionReply(true), new CanCommitTransactionReply(true));
113 ListenableFuture<Boolean> future = proxy.canCommit();
115 assertEquals("canCommit", true, future.get());
117 verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
121 public void testCanCommitWithMultipleCohortsAndOneFailure() throws Exception {
123 ThreePhaseCommitCohortProxy proxy = setupProxy(3);
125 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
126 new CanCommitTransactionReply(true), new CanCommitTransactionReply(false),
127 new CanCommitTransactionReply(true));
129 ListenableFuture<Boolean> future = proxy.canCommit();
131 assertEquals("canCommit", false, future.get());
133 verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS);
136 @Test(expected = ExecutionException.class)
137 public void testCanCommitWithExceptionFailure() throws Exception {
139 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
141 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
143 proxy.canCommit().get();
146 @Test(expected = ExecutionException.class)
147 public void testCanCommitWithInvalidResponseType() throws Exception {
149 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
151 setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
152 new PreCommitTransactionReply());
154 proxy.canCommit().get();
158 public void testPreCommit() throws Exception {
159 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
161 setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
162 new PreCommitTransactionReply());
164 proxy.preCommit().get();
166 verifyCohortInvocations(1, PreCommitTransaction.SERIALIZABLE_CLASS);
169 @Test(expected = ExecutionException.class)
170 public void testPreCommitWithFailure() throws Exception {
171 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
173 setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
174 new PreCommitTransactionReply(), new RuntimeException("mock"));
176 proxy.preCommit().get();
180 public void testAbort() throws Exception {
181 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
183 setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply());
187 verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
191 public void testAbortWithFailure() throws Exception {
192 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
194 setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
196 // The exception should not get propagated.
199 verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
203 public void testCommit() throws Exception {
205 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
207 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
208 new CommitTransactionReply());
210 proxy.commit().get();
212 verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
215 @Test(expected = ExecutionException.class)
216 public void testCommitWithFailure() throws Exception {
218 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
220 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
221 new RuntimeException("mock"));
223 proxy.commit().get();
226 @Test(expected = ExecutionException.class)
227 public void teseCommitWithInvalidResponseType() throws Exception {
229 ThreePhaseCommitCohortProxy proxy = setupProxy(1);
231 setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply());
233 proxy.commit().get();
237 public void testGetCohortPaths() {
239 ThreePhaseCommitCohortProxy proxy = setupProxy(2);
241 List<ActorPath> paths = proxy.getCohortPaths();
242 assertNotNull("getCohortPaths returned null", paths);
243 assertEquals("getCohortPaths size", 2, paths.size());