Fix license header violations in sal-distributed-datastore
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxyTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.fail;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Matchers.isA;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.times;
17 import static org.mockito.Mockito.verify;
18 import akka.actor.ActorPath;
19 import akka.actor.ActorSelection;
20 import akka.actor.Props;
21 import akka.dispatch.Futures;
22 import akka.util.Timeout;
23 import com.codahale.metrics.Snapshot;
24 import com.codahale.metrics.Timer;
25 import com.google.common.collect.Lists;
26 import com.google.common.util.concurrent.ListenableFuture;
27 import java.util.List;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.TimeUnit;
30 import org.junit.Before;
31 import org.junit.Test;
32 import org.mockito.Mock;
33 import org.mockito.MockitoAnnotations;
34 import org.mockito.stubbing.Stubber;
35 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
36 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
37 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
38 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
39 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
40 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
41 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
42 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
43 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
44 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
45 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
46 import scala.concurrent.Future;
47 import scala.concurrent.duration.Duration;
48
49 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
50
51     @SuppressWarnings("serial")
52     static class TestException extends RuntimeException {
53     }
54
55     @Mock
56     private ActorContext actorContext;
57
58     @Mock
59     private DatastoreContext datastoreContext;
60
61     @Mock
62     private Timer commitTimer;
63
64     @Mock
65     private Timer.Context commitTimerContext;
66
67     @Mock
68     private Snapshot commitSnapshot;
69
70     @Before
71     public void setUp() {
72         MockitoAnnotations.initMocks(this);
73
74         doReturn(getSystem()).when(actorContext).getActorSystem();
75         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
76         doReturn(datastoreContext).when(actorContext).getDatastoreContext();
77         doReturn(30).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
78         doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
79         doReturn(commitTimerContext).when(commitTimer).time();
80         doReturn(commitSnapshot).when(commitTimer).getSnapshot();
81         for(int i=1;i<11;i++){
82             // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
83             // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
84             doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
85         }
86         doReturn(10.0).when(actorContext).getTxCreationLimit();
87     }
88
89     private Future<ActorSelection> newCohort() {
90         ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path();
91         ActorSelection actorSelection = getSystem().actorSelection(path);
92         return Futures.successful(actorSelection);
93     }
94
95     private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception {
96         List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
97         for(int i = 1; i <= nCohorts; i++) {
98             cohortFutures.add(newCohort());
99         }
100
101         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
102     }
103
104     private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath()
105             throws Exception {
106         List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
107         cohortFutures.add(newCohort());
108         cohortFutures.add(Futures.<ActorSelection>failed(new TestException()));
109
110         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
111     }
112
113     private void setupMockActorContext(Class<?> requestType, Object... responses) {
114         Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures
115                 .failed((Throwable) responses[0]) : Futures
116                 .successful(((SerializableMessage) responses[0]).toSerializable()));
117
118         for(int i = 1; i < responses.length; i++) {
119             stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures
120                     .failed((Throwable) responses[i]) : Futures
121                     .successful(((SerializableMessage) responses[i]).toSerializable()));
122         }
123
124         stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
125                 isA(requestType), any(Timeout.class));
126
127         doReturn(new Timeout(Duration.apply(1000, TimeUnit.MILLISECONDS)))
128                 .when(actorContext).getTransactionCommitOperationTimeout();
129     }
130
131     private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
132         verify(actorContext, times(nCohorts)).executeOperationAsync(
133                 any(ActorSelection.class), isA(requestType), any(Timeout.class));
134     }
135
136     private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
137
138         try {
139             future.get(5, TimeUnit.SECONDS);
140             fail("Expected ExecutionException");
141         } catch(ExecutionException e) {
142             throw e.getCause();
143         }
144     }
145
146     @Test
147     public void testCanCommitWithOneCohort() throws Exception {
148
149         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
150
151         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
152                 CanCommitTransactionReply.YES);
153
154         ListenableFuture<Boolean> future = proxy.canCommit();
155
156         assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
157
158         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
159                 CanCommitTransactionReply.NO);
160
161         future = proxy.canCommit();
162
163         assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
164
165         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
166     }
167
168     @Test
169     public void testCanCommitWithMultipleCohorts() throws Exception {
170
171         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
172
173         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
174                 CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
175
176         ListenableFuture<Boolean> future = proxy.canCommit();
177
178         assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
179
180         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
181     }
182
183     @Test
184     public void testCanCommitWithMultipleCohortsAndOneFailure() throws Exception {
185
186         ThreePhaseCommitCohortProxy proxy = setupProxy(3);
187
188         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
189                 CanCommitTransactionReply.YES, CanCommitTransactionReply.NO, CanCommitTransactionReply.YES);
190
191         ListenableFuture<Boolean> future = proxy.canCommit();
192
193         Boolean actual = future.get(5, TimeUnit.SECONDS);
194
195         assertEquals("canCommit", false, actual);
196
197         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
198     }
199
200     @Test(expected = TestException.class)
201     public void testCanCommitWithExceptionFailure() throws Throwable {
202
203         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
204
205         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new TestException());
206
207         propagateExecutionExceptionCause(proxy.canCommit());
208     }
209
210     @Test(expected = ExecutionException.class)
211     public void testCanCommitWithInvalidResponseType() throws Exception {
212
213         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
214
215         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
216                 new PreCommitTransactionReply());
217
218         proxy.canCommit().get(5, TimeUnit.SECONDS);
219     }
220
221     @Test(expected = TestException.class)
222     public void testCanCommitWithFailedCohortPath() throws Throwable {
223
224         ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
225
226         try {
227             propagateExecutionExceptionCause(proxy.canCommit());
228         } finally {
229             verifyCohortInvocations(0, CanCommitTransaction.SERIALIZABLE_CLASS);
230         }
231     }
232
233     @Test
234     public void testPreCommit() throws Exception {
235         // Precommit is currently a no-op
236         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
237
238         setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
239                 new PreCommitTransactionReply());
240
241         proxy.preCommit().get(5, TimeUnit.SECONDS);
242     }
243
244     @Test
245     public void testAbort() throws Exception {
246         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
247
248         setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply());
249
250         proxy.abort().get(5, TimeUnit.SECONDS);
251
252         verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
253     }
254
255     @Test
256     public void testAbortWithFailure() throws Exception {
257         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
258
259         setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
260
261         // The exception should not get propagated.
262         proxy.abort().get(5, TimeUnit.SECONDS);
263
264         verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
265     }
266
267     @Test
268     public void testAbortWithFailedCohortPath() throws Throwable {
269
270         ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
271
272         // The exception should not get propagated.
273         proxy.abort().get(5, TimeUnit.SECONDS);
274
275         verifyCohortInvocations(0, AbortTransaction.SERIALIZABLE_CLASS);
276     }
277
278     @Test
279     public void testCommit() throws Exception {
280
281         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
282
283         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
284                 new CommitTransactionReply());
285
286         proxy.commit().get(5, TimeUnit.SECONDS);
287
288         verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
289     }
290
291     @Test(expected = TestException.class)
292     public void testCommitWithFailure() throws Throwable {
293
294         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
295
296         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
297                 new TestException());
298
299         propagateExecutionExceptionCause(proxy.commit());
300     }
301
302     @Test(expected = ExecutionException.class)
303     public void testCommitWithInvalidResponseType() throws Exception {
304
305         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
306
307         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply());
308
309         proxy.commit().get(5, TimeUnit.SECONDS);
310     }
311
312     @Test(expected = TestException.class)
313     public void testCommitWithFailedCohortPath() throws Throwable {
314
315         ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
316
317         try {
318             propagateExecutionExceptionCause(proxy.commit());
319         } finally {
320
321             verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
322         }
323
324     }
325
326     @Test
327     public void testAllThreePhasesSuccessful() throws Exception {
328
329         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
330
331         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
332                 CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
333
334         setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
335                 new PreCommitTransactionReply(), new PreCommitTransactionReply());
336
337         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
338                 new CommitTransactionReply(), new CommitTransactionReply());
339
340         assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
341
342         proxy.canCommit().get(5, TimeUnit.SECONDS);
343         proxy.preCommit().get(5, TimeUnit.SECONDS);
344         proxy.commit().get(5, TimeUnit.SECONDS);
345
346         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
347         verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
348
349     }
350
351     @Test
352     public void testDoNotChangeTxCreationLimitWhenCommittingEmptyTxn() throws Exception {
353
354         ThreePhaseCommitCohortProxy proxy = setupProxy(0);
355
356         assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
357
358         proxy.canCommit().get(5, TimeUnit.SECONDS);
359         proxy.preCommit().get(5, TimeUnit.SECONDS);
360         proxy.commit().get(5, TimeUnit.SECONDS);
361
362     }
363 }