0a2a0d1bc0595a8932f2e2ee3ab27c6b8f422855
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxyTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.fail;
5 import static org.mockito.Matchers.any;
6 import static org.mockito.Matchers.anyLong;
7 import static org.mockito.Matchers.isA;
8 import static org.mockito.Mockito.doReturn;
9 import static org.mockito.Mockito.never;
10 import static org.mockito.Mockito.timeout;
11 import static org.mockito.Mockito.times;
12 import static org.mockito.Mockito.verify;
13 import akka.actor.ActorPath;
14 import akka.actor.ActorSelection;
15 import akka.actor.Props;
16 import akka.dispatch.Futures;
17 import akka.util.Timeout;
18 import com.codahale.metrics.Snapshot;
19 import com.codahale.metrics.Timer;
20 import com.google.common.collect.Lists;
21 import com.google.common.util.concurrent.ListenableFuture;
22 import java.util.List;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.TimeUnit;
25 import org.junit.Before;
26 import org.junit.Test;
27 import org.mockito.Mock;
28 import org.mockito.MockitoAnnotations;
29 import org.mockito.stubbing.Stubber;
30 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
31 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
32 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
33 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
34 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
35 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
36 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
37 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
38 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
39 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
40 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
41 import scala.concurrent.Future;
42
43 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
44
45     @SuppressWarnings("serial")
46     static class TestException extends RuntimeException {
47     }
48
49     @Mock
50     private ActorContext actorContext;
51
52     @Mock
53     private DatastoreContext datastoreContext;
54
55     @Mock
56     private Timer commitTimer;
57
58     @Mock
59     private Timer.Context commitTimerContext;
60
61     @Mock
62     private Snapshot commitSnapshot;
63
64     @Before
65     public void setUp() {
66         MockitoAnnotations.initMocks(this);
67
68         doReturn(getSystem()).when(actorContext).getActorSystem();
69         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
70         doReturn(datastoreContext).when(actorContext).getDatastoreContext();
71         doReturn(100).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
72         doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
73         doReturn(commitTimerContext).when(commitTimer).time();
74         doReturn(commitSnapshot).when(commitTimer).getSnapshot();
75         doReturn(TimeUnit.MILLISECONDS.toNanos(2000) * 1.0).when(commitSnapshot).get95thPercentile();
76         doReturn(10.0).when(actorContext).getTxCreationLimit();
77     }
78
79     private Future<ActorSelection> newCohort() {
80         ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path();
81         ActorSelection actorSelection = getSystem().actorSelection(path);
82         return Futures.successful(actorSelection);
83     }
84
85     private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception {
86         List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
87         for(int i = 1; i <= nCohorts; i++) {
88             cohortFutures.add(newCohort());
89         }
90
91         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
92     }
93
94     private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath()
95             throws Exception {
96         List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
97         cohortFutures.add(newCohort());
98         cohortFutures.add(Futures.<ActorSelection>failed(new TestException()));
99
100         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
101     }
102
103     private void setupMockActorContext(Class<?> requestType, Object... responses) {
104         Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures
105                 .failed((Throwable) responses[0]) : Futures
106                 .successful(((SerializableMessage) responses[0]).toSerializable()));
107
108         for(int i = 1; i < responses.length; i++) {
109             stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures
110                     .failed((Throwable) responses[i]) : Futures
111                     .successful(((SerializableMessage) responses[i]).toSerializable()));
112         }
113
114         stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
115                 isA(requestType), any(Timeout.class));
116     }
117
118     private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
119         verify(actorContext, times(nCohorts)).executeOperationAsync(
120                 any(ActorSelection.class), isA(requestType), any(Timeout.class));
121     }
122
123     private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
124
125         try {
126             future.get(5, TimeUnit.SECONDS);
127             fail("Expected ExecutionException");
128         } catch(ExecutionException e) {
129             throw e.getCause();
130         }
131     }
132
133     @Test
134     public void testCanCommitWithOneCohort() throws Exception {
135
136         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
137
138         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
139                 CanCommitTransactionReply.YES);
140
141         ListenableFuture<Boolean> future = proxy.canCommit();
142
143         assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
144
145         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
146                 CanCommitTransactionReply.NO);
147
148         future = proxy.canCommit();
149
150         assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
151
152         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
153     }
154
155     @Test
156     public void testCanCommitWithMultipleCohorts() throws Exception {
157
158         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
159
160         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
161                 CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
162
163         ListenableFuture<Boolean> future = proxy.canCommit();
164
165         assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
166
167         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
168     }
169
170     @Test
171     public void testCanCommitWithMultipleCohortsAndOneFailure() throws Exception {
172
173         ThreePhaseCommitCohortProxy proxy = setupProxy(3);
174
175         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
176                 CanCommitTransactionReply.YES, CanCommitTransactionReply.NO, CanCommitTransactionReply.YES);
177
178         ListenableFuture<Boolean> future = proxy.canCommit();
179
180         assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
181
182         verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS);
183     }
184
185     @Test(expected = TestException.class)
186     public void testCanCommitWithExceptionFailure() throws Throwable {
187
188         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
189
190         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new TestException());
191
192         propagateExecutionExceptionCause(proxy.canCommit());
193     }
194
195     @Test(expected = ExecutionException.class)
196     public void testCanCommitWithInvalidResponseType() throws Exception {
197
198         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
199
200         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
201                 new PreCommitTransactionReply());
202
203         proxy.canCommit().get(5, TimeUnit.SECONDS);
204     }
205
206     @Test(expected = TestException.class)
207     public void testCanCommitWithFailedCohortPath() throws Throwable {
208
209         ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
210
211         try {
212             propagateExecutionExceptionCause(proxy.canCommit());
213         } finally {
214             verifyCohortInvocations(0, CanCommitTransaction.SERIALIZABLE_CLASS);
215         }
216     }
217
218     @Test
219     public void testPreCommit() throws Exception {
220         // Precommit is currently a no-op
221         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
222
223         setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
224                 new PreCommitTransactionReply());
225
226         proxy.preCommit().get(5, TimeUnit.SECONDS);
227     }
228
229     @Test
230     public void testAbort() throws Exception {
231         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
232
233         setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply());
234
235         proxy.abort().get(5, TimeUnit.SECONDS);
236
237         verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
238     }
239
240     @Test
241     public void testAbortWithFailure() throws Exception {
242         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
243
244         setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
245
246         // The exception should not get propagated.
247         proxy.abort().get(5, TimeUnit.SECONDS);
248
249         verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
250     }
251
252     @Test
253     public void testAbortWithFailedCohortPath() throws Throwable {
254
255         ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
256
257         // The exception should not get propagated.
258         proxy.abort().get(5, TimeUnit.SECONDS);
259
260         verifyCohortInvocations(0, AbortTransaction.SERIALIZABLE_CLASS);
261     }
262
263     @Test
264     public void testCommit() throws Exception {
265
266         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
267
268         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
269                 new CommitTransactionReply());
270
271         proxy.commit().get(5, TimeUnit.SECONDS);
272
273         verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
274     }
275
276     @Test(expected = TestException.class)
277     public void testCommitWithFailure() throws Throwable {
278
279         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
280
281         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
282                 new TestException());
283
284         propagateExecutionExceptionCause(proxy.commit());
285     }
286
287     @Test(expected = ExecutionException.class)
288     public void testCommitWithInvalidResponseType() throws Exception {
289
290         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
291
292         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply());
293
294         proxy.commit().get(5, TimeUnit.SECONDS);
295     }
296
297     @Test(expected = TestException.class)
298     public void testCommitWithFailedCohortPath() throws Throwable {
299
300         ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
301
302         try {
303             propagateExecutionExceptionCause(proxy.commit());
304         } finally {
305
306             verify(actorContext, never()).setTxCreationLimit(anyLong());
307             verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
308         }
309
310     }
311
312     @Test
313     public void testAllThreePhasesSuccessful() throws Exception {
314
315         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
316
317         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
318                 CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
319
320         setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
321                 new PreCommitTransactionReply(), new PreCommitTransactionReply());
322
323         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
324                 new CommitTransactionReply(), new CommitTransactionReply());
325
326         assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
327
328         proxy.canCommit().get(5, TimeUnit.SECONDS);
329         proxy.preCommit().get(5, TimeUnit.SECONDS);
330         proxy.commit().get(5, TimeUnit.SECONDS);
331
332         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
333         verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
334
335         // Verify that the creation limit was changed to 0.5 (based on setup)
336         verify(actorContext, timeout(5000)).setTxCreationLimit(0.5);
337     }
338
339     @Test
340     public void testDoNotChangeTxCreationLimitWhenCommittingEmptyTxn() throws Exception {
341
342         ThreePhaseCommitCohortProxy proxy = setupProxy(0);
343
344         assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
345
346         proxy.canCommit().get(5, TimeUnit.SECONDS);
347         proxy.preCommit().get(5, TimeUnit.SECONDS);
348         proxy.commit().get(5, TimeUnit.SECONDS);
349
350         verify(actorContext, never()).setTxCreationLimit(anyLong());
351     }
352 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.