Deprecate 3PC protobuff messages
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxyTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.fail;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Matchers.isA;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.times;
17 import static org.mockito.Mockito.verify;
18 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
19 import akka.actor.ActorPath;
20 import akka.actor.ActorSelection;
21 import akka.actor.Props;
22 import akka.dispatch.Futures;
23 import akka.util.Timeout;
24 import com.codahale.metrics.Snapshot;
25 import com.codahale.metrics.Timer;
26 import com.google.common.collect.Lists;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import java.util.List;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.TimeUnit;
31 import org.junit.Before;
32 import org.junit.Test;
33 import org.mockito.Mock;
34 import org.mockito.MockitoAnnotations;
35 import org.mockito.stubbing.Stubber;
36 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
37 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
38 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
39 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
40 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
41 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
42 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
43 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
44 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
45 import scala.concurrent.Future;
46 import scala.concurrent.duration.Duration;
47
48 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
49
50     @SuppressWarnings("serial")
51     static class TestException extends RuntimeException {
52     }
53
54     @Mock
55     private ActorContext actorContext;
56
57     @Mock
58     private DatastoreContext datastoreContext;
59
60     @Mock
61     private Timer commitTimer;
62
63     @Mock
64     private Timer.Context commitTimerContext;
65
66     @Mock
67     private Snapshot commitSnapshot;
68
69     @Before
70     public void setUp() {
71         MockitoAnnotations.initMocks(this);
72
73         doReturn(getSystem()).when(actorContext).getActorSystem();
74         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(actorContext).getClientDispatcher();
75         doReturn(datastoreContext).when(actorContext).getDatastoreContext();
76         doReturn(30).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
77         doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
78         doReturn(commitTimerContext).when(commitTimer).time();
79         doReturn(commitSnapshot).when(commitTimer).getSnapshot();
80         for(int i=1;i<11;i++){
81             // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
82             // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
83             doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
84         }
85         doReturn(10.0).when(actorContext).getTxCreationLimit();
86     }
87
88     private Future<ActorSelection> newCohort() {
89         ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path();
90         ActorSelection actorSelection = getSystem().actorSelection(path);
91         return Futures.successful(actorSelection);
92     }
93
94     private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception {
95         List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
96         for(int i = 1; i <= nCohorts; i++) {
97             cohortFutures.add(newCohort());
98         }
99
100         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
101     }
102
103     private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath()
104             throws Exception {
105         List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
106         cohortFutures.add(newCohort());
107         cohortFutures.add(Futures.<ActorSelection>failed(new TestException()));
108
109         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
110     }
111
112     private void setupMockActorContext(Class<?> requestType, Object... responses) {
113         Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures
114                 .failed((Throwable) responses[0]) : Futures
115                 .successful(((SerializableMessage) responses[0]).toSerializable()));
116
117         for(int i = 1; i < responses.length; i++) {
118             stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures
119                     .failed((Throwable) responses[i]) : Futures
120                     .successful(((SerializableMessage) responses[i]).toSerializable()));
121         }
122
123         stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
124                 isA(requestType), any(Timeout.class));
125
126         doReturn(new Timeout(Duration.apply(1000, TimeUnit.MILLISECONDS)))
127                 .when(actorContext).getTransactionCommitOperationTimeout();
128     }
129
130     private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
131         verify(actorContext, times(nCohorts)).executeOperationAsync(
132                 any(ActorSelection.class), isA(requestType), any(Timeout.class));
133     }
134
135     private static void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
136
137         try {
138             future.get(5, TimeUnit.SECONDS);
139             fail("Expected ExecutionException");
140         } catch(ExecutionException e) {
141             throw e.getCause();
142         }
143     }
144
145     @Test
146     public void testCanCommitWithOneCohort() throws Exception {
147
148         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
149
150         setupMockActorContext(CanCommitTransaction.class, CanCommitTransactionReply.yes(CURRENT_VERSION));
151
152         ListenableFuture<Boolean> future = proxy.canCommit();
153
154         assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
155
156         setupMockActorContext(CanCommitTransaction.class, CanCommitTransactionReply.no(CURRENT_VERSION));
157
158         future = proxy.canCommit();
159
160         assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
161
162         verifyCohortInvocations(2, CanCommitTransaction.class);
163     }
164
165     @Test
166     public void testCanCommitWithMultipleCohorts() throws Exception {
167
168         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
169
170         setupMockActorContext(CanCommitTransaction.class,
171                 CanCommitTransactionReply.yes(CURRENT_VERSION), CanCommitTransactionReply.yes(CURRENT_VERSION));
172
173         ListenableFuture<Boolean> future = proxy.canCommit();
174
175         assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
176
177         verifyCohortInvocations(2, CanCommitTransaction.class);
178     }
179
180     @Test
181     public void testCanCommitWithMultipleCohortsAndOneFailure() throws Exception {
182
183         ThreePhaseCommitCohortProxy proxy = setupProxy(3);
184
185         setupMockActorContext(CanCommitTransaction.class,
186                 CanCommitTransactionReply.yes(CURRENT_VERSION), CanCommitTransactionReply.no(CURRENT_VERSION),
187                 CanCommitTransactionReply.yes(CURRENT_VERSION));
188
189         ListenableFuture<Boolean> future = proxy.canCommit();
190
191         Boolean actual = future.get(5, TimeUnit.SECONDS);
192
193         assertEquals("canCommit", false, actual);
194
195         verifyCohortInvocations(2, CanCommitTransaction.class);
196     }
197
198     @Test(expected = TestException.class)
199     public void testCanCommitWithExceptionFailure() throws Throwable {
200
201         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
202
203         setupMockActorContext(CanCommitTransaction.class, new TestException());
204
205         propagateExecutionExceptionCause(proxy.canCommit());
206     }
207
208     @Test(expected = ExecutionException.class)
209     public void testCanCommitWithInvalidResponseType() throws Exception {
210
211         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
212
213         setupMockActorContext(CanCommitTransaction.class,
214                 new CommitTransactionReply());
215
216         proxy.canCommit().get(5, TimeUnit.SECONDS);
217     }
218
219     @Test(expected = TestException.class)
220     public void testCanCommitWithFailedCohortPath() throws Throwable {
221
222         ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
223
224         try {
225             propagateExecutionExceptionCause(proxy.canCommit());
226         } finally {
227             verifyCohortInvocations(0, CanCommitTransaction.class);
228         }
229     }
230
231     @Test
232     public void testPreCommit() throws Exception {
233         // Precommit is currently a no-op
234         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
235
236         proxy.preCommit().get(5, TimeUnit.SECONDS);
237     }
238
239     @Test
240     public void testAbort() throws Exception {
241         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
242
243         setupMockActorContext(AbortTransaction.class, new AbortTransactionReply());
244
245         proxy.abort().get(5, TimeUnit.SECONDS);
246
247         verifyCohortInvocations(1, AbortTransaction.class);
248     }
249
250     @Test
251     public void testAbortWithFailure() throws Exception {
252         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
253
254         setupMockActorContext(AbortTransaction.class, new RuntimeException("mock"));
255
256         // The exception should not get propagated.
257         proxy.abort().get(5, TimeUnit.SECONDS);
258
259         verifyCohortInvocations(1, AbortTransaction.class);
260     }
261
262     @Test
263     public void testAbortWithFailedCohortPath() throws Throwable {
264
265         ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
266
267         // The exception should not get propagated.
268         proxy.abort().get(5, TimeUnit.SECONDS);
269
270         verifyCohortInvocations(0, AbortTransaction.class);
271     }
272
273     @Test
274     public void testCommit() throws Exception {
275
276         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
277
278         setupMockActorContext(CommitTransaction.class, new CommitTransactionReply(),
279                 new CommitTransactionReply());
280
281         proxy.commit().get(5, TimeUnit.SECONDS);
282
283         verifyCohortInvocations(2, CommitTransaction.class);
284     }
285
286     @Test(expected = TestException.class)
287     public void testCommitWithFailure() throws Throwable {
288
289         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
290
291         setupMockActorContext(CommitTransaction.class, new CommitTransactionReply(),
292                 new TestException());
293
294         propagateExecutionExceptionCause(proxy.commit());
295     }
296
297     @Test(expected = ExecutionException.class)
298     public void testCommitWithInvalidResponseType() throws Exception {
299
300         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
301
302         setupMockActorContext(CommitTransaction.class, new AbortTransactionReply());
303
304         proxy.commit().get(5, TimeUnit.SECONDS);
305     }
306
307     @Test(expected = TestException.class)
308     public void testCommitWithFailedCohortPath() throws Throwable {
309
310         ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
311
312         try {
313             propagateExecutionExceptionCause(proxy.commit());
314         } finally {
315
316             verifyCohortInvocations(0, CommitTransaction.class);
317         }
318
319     }
320
321     @Test
322     public void testAllThreePhasesSuccessful() throws Exception {
323
324         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
325
326         setupMockActorContext(CanCommitTransaction.class,
327                 CanCommitTransactionReply.yes(CURRENT_VERSION), CanCommitTransactionReply.yes(CURRENT_VERSION));
328
329         setupMockActorContext(CommitTransaction.class,
330                 new CommitTransactionReply(), new CommitTransactionReply());
331
332         assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
333
334         proxy.canCommit().get(5, TimeUnit.SECONDS);
335         proxy.preCommit().get(5, TimeUnit.SECONDS);
336         proxy.commit().get(5, TimeUnit.SECONDS);
337
338         verifyCohortInvocations(2, CanCommitTransaction.class);
339         verifyCohortInvocations(2, CommitTransaction.class);
340
341     }
342
343     @Test
344     public void testDoNotChangeTxCreationLimitWhenCommittingEmptyTxn() throws Exception {
345
346         ThreePhaseCommitCohortProxy proxy = setupProxy(0);
347
348         assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
349
350         proxy.canCommit().get(5, TimeUnit.SECONDS);
351         proxy.preCommit().get(5, TimeUnit.SECONDS);
352         proxy.commit().get(5, TimeUnit.SECONDS);
353
354     }
355 }