Merge "Adjust Tx rate limiter for unused transactions"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxyTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.fail;
5 import static org.mockito.Matchers.any;
6 import static org.mockito.Matchers.anyLong;
7 import static org.mockito.Matchers.isA;
8 import static org.mockito.Mockito.doReturn;
9 import static org.mockito.Mockito.never;
10 import static org.mockito.Mockito.times;
11 import static org.mockito.Mockito.verify;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorSelection;
14 import akka.actor.Props;
15 import akka.dispatch.Futures;
16 import akka.util.Timeout;
17 import com.codahale.metrics.Snapshot;
18 import com.codahale.metrics.Timer;
19 import com.google.common.collect.Lists;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import java.util.List;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.TimeUnit;
24 import org.junit.Before;
25 import org.junit.Test;
26 import org.mockito.Mock;
27 import org.mockito.MockitoAnnotations;
28 import org.mockito.stubbing.Stubber;
29 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
31 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
33 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
34 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
35 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
36 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
37 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
38 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
39 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
40 import scala.concurrent.Future;
41 import scala.concurrent.duration.Duration;
42
43 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
44
45     @SuppressWarnings("serial")
46     static class TestException extends RuntimeException {
47     }
48
49     @Mock
50     private ActorContext actorContext;
51
52     @Mock
53     private DatastoreContext datastoreContext;
54
55     @Mock
56     private Timer commitTimer;
57
58     @Mock
59     private Timer.Context commitTimerContext;
60
61     @Mock
62     private Snapshot commitSnapshot;
63
64     @Before
65     public void setUp() {
66         MockitoAnnotations.initMocks(this);
67
68         doReturn(getSystem()).when(actorContext).getActorSystem();
69         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
70         doReturn(datastoreContext).when(actorContext).getDatastoreContext();
71         doReturn(30).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
72         doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
73         doReturn(commitTimerContext).when(commitTimer).time();
74         doReturn(commitSnapshot).when(commitTimer).getSnapshot();
75         for(int i=1;i<11;i++){
76             // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
77             // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
78             doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
79         }
80         doReturn(10.0).when(actorContext).getTxCreationLimit();
81     }
82
83     private Future<ActorSelection> newCohort() {
84         ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path();
85         ActorSelection actorSelection = getSystem().actorSelection(path);
86         return Futures.successful(actorSelection);
87     }
88
89     private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception {
90         List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
91         for(int i = 1; i <= nCohorts; i++) {
92             cohortFutures.add(newCohort());
93         }
94
95         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
96     }
97
98     private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath()
99             throws Exception {
100         List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
101         cohortFutures.add(newCohort());
102         cohortFutures.add(Futures.<ActorSelection>failed(new TestException()));
103
104         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
105     }
106
107     private void setupMockActorContext(Class<?> requestType, Object... responses) {
108         Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures
109                 .failed((Throwable) responses[0]) : Futures
110                 .successful(((SerializableMessage) responses[0]).toSerializable()));
111
112         for(int i = 1; i < responses.length; i++) {
113             stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures
114                     .failed((Throwable) responses[i]) : Futures
115                     .successful(((SerializableMessage) responses[i]).toSerializable()));
116         }
117
118         stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
119                 isA(requestType), any(Timeout.class));
120
121         doReturn(new Timeout(Duration.apply(1000, TimeUnit.MILLISECONDS)))
122                 .when(actorContext).getTransactionCommitOperationTimeout();
123     }
124
125     private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
126         verify(actorContext, times(nCohorts)).executeOperationAsync(
127                 any(ActorSelection.class), isA(requestType), any(Timeout.class));
128     }
129
130     private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
131
132         try {
133             future.get(5, TimeUnit.SECONDS);
134             fail("Expected ExecutionException");
135         } catch(ExecutionException e) {
136             throw e.getCause();
137         }
138     }
139
140     @Test
141     public void testCanCommitWithOneCohort() throws Exception {
142
143         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
144
145         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
146                 CanCommitTransactionReply.YES);
147
148         ListenableFuture<Boolean> future = proxy.canCommit();
149
150         assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
151
152         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
153                 CanCommitTransactionReply.NO);
154
155         future = proxy.canCommit();
156
157         assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
158
159         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
160     }
161
162     @Test
163     public void testCanCommitWithMultipleCohorts() throws Exception {
164
165         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
166
167         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
168                 CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
169
170         ListenableFuture<Boolean> future = proxy.canCommit();
171
172         assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
173
174         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
175     }
176
177     @Test
178     public void testCanCommitWithMultipleCohortsAndOneFailure() throws Exception {
179
180         ThreePhaseCommitCohortProxy proxy = setupProxy(3);
181
182         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
183                 CanCommitTransactionReply.YES, CanCommitTransactionReply.NO, CanCommitTransactionReply.YES);
184
185         ListenableFuture<Boolean> future = proxy.canCommit();
186
187         Boolean actual = future.get(5, TimeUnit.SECONDS);
188
189         assertEquals("canCommit", false, actual);
190
191         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
192     }
193
194     @Test(expected = TestException.class)
195     public void testCanCommitWithExceptionFailure() throws Throwable {
196
197         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
198
199         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new TestException());
200
201         propagateExecutionExceptionCause(proxy.canCommit());
202     }
203
204     @Test(expected = ExecutionException.class)
205     public void testCanCommitWithInvalidResponseType() throws Exception {
206
207         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
208
209         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
210                 new PreCommitTransactionReply());
211
212         proxy.canCommit().get(5, TimeUnit.SECONDS);
213     }
214
215     @Test(expected = TestException.class)
216     public void testCanCommitWithFailedCohortPath() throws Throwable {
217
218         ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
219
220         try {
221             propagateExecutionExceptionCause(proxy.canCommit());
222         } finally {
223             verifyCohortInvocations(0, CanCommitTransaction.SERIALIZABLE_CLASS);
224         }
225     }
226
227     @Test
228     public void testPreCommit() throws Exception {
229         // Precommit is currently a no-op
230         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
231
232         setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
233                 new PreCommitTransactionReply());
234
235         proxy.preCommit().get(5, TimeUnit.SECONDS);
236     }
237
238     @Test
239     public void testAbort() throws Exception {
240         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
241
242         setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply());
243
244         proxy.abort().get(5, TimeUnit.SECONDS);
245
246         verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
247     }
248
249     @Test
250     public void testAbortWithFailure() throws Exception {
251         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
252
253         setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
254
255         // The exception should not get propagated.
256         proxy.abort().get(5, TimeUnit.SECONDS);
257
258         verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
259     }
260
261     @Test
262     public void testAbortWithFailedCohortPath() throws Throwable {
263
264         ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
265
266         // The exception should not get propagated.
267         proxy.abort().get(5, TimeUnit.SECONDS);
268
269         verifyCohortInvocations(0, AbortTransaction.SERIALIZABLE_CLASS);
270     }
271
272     @Test
273     public void testCommit() throws Exception {
274
275         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
276
277         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
278                 new CommitTransactionReply());
279
280         proxy.commit().get(5, TimeUnit.SECONDS);
281
282         verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
283     }
284
285     @Test(expected = TestException.class)
286     public void testCommitWithFailure() throws Throwable {
287
288         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
289
290         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
291                 new TestException());
292
293         propagateExecutionExceptionCause(proxy.commit());
294     }
295
296     @Test(expected = ExecutionException.class)
297     public void testCommitWithInvalidResponseType() throws Exception {
298
299         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
300
301         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply());
302
303         proxy.commit().get(5, TimeUnit.SECONDS);
304     }
305
306     @Test(expected = TestException.class)
307     public void testCommitWithFailedCohortPath() throws Throwable {
308
309         ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
310
311         try {
312             propagateExecutionExceptionCause(proxy.commit());
313         } finally {
314
315             verify(actorContext, never()).setTxCreationLimit(anyLong());
316             verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
317         }
318
319     }
320
321     @Test
322     public void testAllThreePhasesSuccessful() throws Exception {
323
324         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
325
326         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
327                 CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
328
329         setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
330                 new PreCommitTransactionReply(), new PreCommitTransactionReply());
331
332         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
333                 new CommitTransactionReply(), new CommitTransactionReply());
334
335         assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
336
337         proxy.canCommit().get(5, TimeUnit.SECONDS);
338         proxy.preCommit().get(5, TimeUnit.SECONDS);
339         proxy.commit().get(5, TimeUnit.SECONDS);
340
341         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
342         verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
343
344     }
345
346     @Test
347     public void testDoNotChangeTxCreationLimitWhenCommittingEmptyTxn() throws Exception {
348
349         ThreePhaseCommitCohortProxy proxy = setupProxy(0);
350
351         assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
352
353         proxy.canCommit().get(5, TimeUnit.SECONDS);
354         proxy.preCommit().get(5, TimeUnit.SECONDS);
355         proxy.commit().get(5, TimeUnit.SECONDS);
356
357         verify(actorContext, never()).setTxCreationLimit(anyLong());
358     }
359 }