Adjust Tx rate limiter for unused transactions
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxyTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.fail;
5 import static org.mockito.Matchers.any;
6 import static org.mockito.Matchers.anyLong;
7 import static org.mockito.Matchers.isA;
8 import static org.mockito.Mockito.doReturn;
9 import static org.mockito.Mockito.never;
10 import static org.mockito.Mockito.times;
11 import static org.mockito.Mockito.verify;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorSelection;
14 import akka.actor.Props;
15 import akka.dispatch.Futures;
16 import akka.util.Timeout;
17 import com.codahale.metrics.Snapshot;
18 import com.codahale.metrics.Timer;
19 import com.google.common.collect.Lists;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import java.util.List;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.TimeUnit;
24 import org.junit.Before;
25 import org.junit.Test;
26 import org.mockito.Mock;
27 import org.mockito.MockitoAnnotations;
28 import org.mockito.stubbing.Stubber;
29 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
31 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
33 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
34 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
35 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
36 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
37 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
38 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
39 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
40 import scala.concurrent.Future;
41
42 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
43
44     @SuppressWarnings("serial")
45     static class TestException extends RuntimeException {
46     }
47
48     @Mock
49     private ActorContext actorContext;
50
51     @Mock
52     private DatastoreContext datastoreContext;
53
54     @Mock
55     private Timer commitTimer;
56
57     @Mock
58     private Timer.Context commitTimerContext;
59
60     @Mock
61     private Snapshot commitSnapshot;
62
63     @Before
64     public void setUp() {
65         MockitoAnnotations.initMocks(this);
66
67         doReturn(getSystem()).when(actorContext).getActorSystem();
68         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
69         doReturn(datastoreContext).when(actorContext).getDatastoreContext();
70         doReturn(30).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
71         doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
72         doReturn(commitTimerContext).when(commitTimer).time();
73         doReturn(commitSnapshot).when(commitTimer).getSnapshot();
74         for(int i=1;i<11;i++){
75             // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
76             // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
77             doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
78         }
79         doReturn(10.0).when(actorContext).getTxCreationLimit();
80     }
81
82     private Future<ActorSelection> newCohort() {
83         ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path();
84         ActorSelection actorSelection = getSystem().actorSelection(path);
85         return Futures.successful(actorSelection);
86     }
87
88     private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception {
89         List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
90         for(int i = 1; i <= nCohorts; i++) {
91             cohortFutures.add(newCohort());
92         }
93
94         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
95     }
96
97     private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath()
98             throws Exception {
99         List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
100         cohortFutures.add(newCohort());
101         cohortFutures.add(Futures.<ActorSelection>failed(new TestException()));
102
103         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
104     }
105
106     private void setupMockActorContext(Class<?> requestType, Object... responses) {
107         Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures
108                 .failed((Throwable) responses[0]) : Futures
109                 .successful(((SerializableMessage) responses[0]).toSerializable()));
110
111         for(int i = 1; i < responses.length; i++) {
112             stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures
113                     .failed((Throwable) responses[i]) : Futures
114                     .successful(((SerializableMessage) responses[i]).toSerializable()));
115         }
116
117         stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
118                 isA(requestType), any(Timeout.class));
119     }
120
121     private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
122         verify(actorContext, times(nCohorts)).executeOperationAsync(
123                 any(ActorSelection.class), isA(requestType), any(Timeout.class));
124     }
125
126     private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
127
128         try {
129             future.get(5, TimeUnit.SECONDS);
130             fail("Expected ExecutionException");
131         } catch(ExecutionException e) {
132             throw e.getCause();
133         }
134     }
135
136     @Test
137     public void testCanCommitWithOneCohort() throws Exception {
138
139         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
140
141         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
142                 CanCommitTransactionReply.YES);
143
144         ListenableFuture<Boolean> future = proxy.canCommit();
145
146         assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
147
148         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
149                 CanCommitTransactionReply.NO);
150
151         future = proxy.canCommit();
152
153         assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
154
155         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
156     }
157
158     @Test
159     public void testCanCommitWithMultipleCohorts() throws Exception {
160
161         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
162
163         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
164                 CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
165
166         ListenableFuture<Boolean> future = proxy.canCommit();
167
168         assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
169
170         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
171     }
172
173     @Test
174     public void testCanCommitWithMultipleCohortsAndOneFailure() throws Exception {
175
176         ThreePhaseCommitCohortProxy proxy = setupProxy(3);
177
178         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
179                 CanCommitTransactionReply.YES, CanCommitTransactionReply.NO, CanCommitTransactionReply.YES);
180
181         ListenableFuture<Boolean> future = proxy.canCommit();
182
183         assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
184
185         verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS);
186     }
187
188     @Test(expected = TestException.class)
189     public void testCanCommitWithExceptionFailure() throws Throwable {
190
191         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
192
193         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new TestException());
194
195         propagateExecutionExceptionCause(proxy.canCommit());
196     }
197
198     @Test(expected = ExecutionException.class)
199     public void testCanCommitWithInvalidResponseType() throws Exception {
200
201         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
202
203         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
204                 new PreCommitTransactionReply());
205
206         proxy.canCommit().get(5, TimeUnit.SECONDS);
207     }
208
209     @Test(expected = TestException.class)
210     public void testCanCommitWithFailedCohortPath() throws Throwable {
211
212         ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
213
214         try {
215             propagateExecutionExceptionCause(proxy.canCommit());
216         } finally {
217             verifyCohortInvocations(0, CanCommitTransaction.SERIALIZABLE_CLASS);
218         }
219     }
220
221     @Test
222     public void testPreCommit() throws Exception {
223         // Precommit is currently a no-op
224         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
225
226         setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
227                 new PreCommitTransactionReply());
228
229         proxy.preCommit().get(5, TimeUnit.SECONDS);
230     }
231
232     @Test
233     public void testAbort() throws Exception {
234         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
235
236         setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply());
237
238         proxy.abort().get(5, TimeUnit.SECONDS);
239
240         verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
241     }
242
243     @Test
244     public void testAbortWithFailure() throws Exception {
245         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
246
247         setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
248
249         // The exception should not get propagated.
250         proxy.abort().get(5, TimeUnit.SECONDS);
251
252         verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
253     }
254
255     @Test
256     public void testAbortWithFailedCohortPath() throws Throwable {
257
258         ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
259
260         // The exception should not get propagated.
261         proxy.abort().get(5, TimeUnit.SECONDS);
262
263         verifyCohortInvocations(0, AbortTransaction.SERIALIZABLE_CLASS);
264     }
265
266     @Test
267     public void testCommit() throws Exception {
268
269         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
270
271         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
272                 new CommitTransactionReply());
273
274         proxy.commit().get(5, TimeUnit.SECONDS);
275
276         verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
277     }
278
279     @Test(expected = TestException.class)
280     public void testCommitWithFailure() throws Throwable {
281
282         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
283
284         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
285                 new TestException());
286
287         propagateExecutionExceptionCause(proxy.commit());
288     }
289
290     @Test(expected = ExecutionException.class)
291     public void testCommitWithInvalidResponseType() throws Exception {
292
293         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
294
295         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply());
296
297         proxy.commit().get(5, TimeUnit.SECONDS);
298     }
299
300     @Test(expected = TestException.class)
301     public void testCommitWithFailedCohortPath() throws Throwable {
302
303         ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
304
305         try {
306             propagateExecutionExceptionCause(proxy.commit());
307         } finally {
308
309             verify(actorContext, never()).setTxCreationLimit(anyLong());
310             verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
311         }
312
313     }
314
315     @Test
316     public void testAllThreePhasesSuccessful() throws Exception {
317
318         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
319
320         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
321                 CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
322
323         setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
324                 new PreCommitTransactionReply(), new PreCommitTransactionReply());
325
326         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
327                 new CommitTransactionReply(), new CommitTransactionReply());
328
329         assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
330
331         proxy.canCommit().get(5, TimeUnit.SECONDS);
332         proxy.preCommit().get(5, TimeUnit.SECONDS);
333         proxy.commit().get(5, TimeUnit.SECONDS);
334
335         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
336         verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
337
338     }
339
340     @Test
341     public void testDoNotChangeTxCreationLimitWhenCommittingEmptyTxn() throws Exception {
342
343         ThreePhaseCommitCohortProxy proxy = setupProxy(0);
344
345         assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
346
347         proxy.canCommit().get(5, TimeUnit.SECONDS);
348         proxy.preCommit().get(5, TimeUnit.SECONDS);
349         proxy.commit().get(5, TimeUnit.SECONDS);
350
351         verify(actorContext, never()).setTxCreationLimit(anyLong());
352     }
353 }