Merge "Bug 2673: Binding cursor-based data change API"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxyTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.fail;
5 import static org.mockito.Matchers.any;
6 import static org.mockito.Matchers.anyLong;
7 import static org.mockito.Matchers.isA;
8 import static org.mockito.Mockito.doReturn;
9 import static org.mockito.Mockito.never;
10 import static org.mockito.Mockito.timeout;
11 import static org.mockito.Mockito.times;
12 import static org.mockito.Mockito.verify;
13 import akka.actor.ActorPath;
14 import akka.actor.ActorSelection;
15 import akka.actor.Props;
16 import akka.dispatch.Futures;
17 import akka.util.Timeout;
18 import com.codahale.metrics.Snapshot;
19 import com.codahale.metrics.Timer;
20 import com.google.common.collect.Lists;
21 import com.google.common.util.concurrent.ListenableFuture;
22 import java.util.List;
23 import java.util.concurrent.ExecutionException;
24 import java.util.concurrent.TimeUnit;
25 import org.junit.Before;
26 import org.junit.Test;
27 import org.mockito.Mock;
28 import org.mockito.MockitoAnnotations;
29 import org.mockito.stubbing.Stubber;
30 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
31 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
32 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
33 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
34 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
35 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
36 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
37 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
38 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
39 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
40 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
41 import scala.concurrent.Future;
42
43 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
44
45     @SuppressWarnings("serial")
46     static class TestException extends RuntimeException {
47     }
48
49     @Mock
50     private ActorContext actorContext;
51
52     @Mock
53     private DatastoreContext datastoreContext;
54
55     @Mock
56     private Timer commitTimer;
57
58     @Mock
59     private Timer.Context commitTimerContext;
60
61     @Mock
62     private Snapshot commitSnapshot;
63
64     @Before
65     public void setUp() {
66         MockitoAnnotations.initMocks(this);
67
68         doReturn(getSystem()).when(actorContext).getActorSystem();
69         doReturn(datastoreContext).when(actorContext).getDatastoreContext();
70         doReturn(100).when(datastoreContext).getShardTransactionCommitTimeoutInSeconds();
71         doReturn(commitTimer).when(actorContext).getOperationTimer("commit");
72         doReturn(commitTimerContext).when(commitTimer).time();
73         doReturn(commitSnapshot).when(commitTimer).getSnapshot();
74         doReturn(TimeUnit.MILLISECONDS.toNanos(2000) * 1.0).when(commitSnapshot).get95thPercentile();
75         doReturn(10.0).when(actorContext).getTxCreationLimit();
76     }
77
78     private Future<ActorSelection> newCohort() {
79         ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path();
80         ActorSelection actorSelection = getSystem().actorSelection(path);
81         return Futures.successful(actorSelection);
82     }
83
84     private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception {
85         List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
86         for(int i = 1; i <= nCohorts; i++) {
87             cohortFutures.add(newCohort());
88         }
89
90         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
91     }
92
93     private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath()
94             throws Exception {
95         List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
96         cohortFutures.add(newCohort());
97         cohortFutures.add(Futures.<ActorSelection>failed(new TestException()));
98
99         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
100     }
101
102     private void setupMockActorContext(Class<?> requestType, Object... responses) {
103         Stubber stubber = doReturn(responses[0] instanceof Throwable ? Futures
104                 .failed((Throwable) responses[0]) : Futures
105                 .successful(((SerializableMessage) responses[0]).toSerializable()));
106
107         for(int i = 1; i < responses.length; i++) {
108             stubber = stubber.doReturn(responses[i] instanceof Throwable ? Futures
109                     .failed((Throwable) responses[i]) : Futures
110                     .successful(((SerializableMessage) responses[i]).toSerializable()));
111         }
112
113         stubber.when(actorContext).executeOperationAsync(any(ActorSelection.class),
114                 isA(requestType), any(Timeout.class));
115     }
116
117     private void verifyCohortInvocations(int nCohorts, Class<?> requestType) {
118         verify(actorContext, times(nCohorts)).executeOperationAsync(
119                 any(ActorSelection.class), isA(requestType), any(Timeout.class));
120     }
121
122     private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
123
124         try {
125             future.get(5, TimeUnit.SECONDS);
126             fail("Expected ExecutionException");
127         } catch(ExecutionException e) {
128             throw e.getCause();
129         }
130     }
131
132     @Test
133     public void testCanCommitWithOneCohort() throws Exception {
134
135         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
136
137         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
138                 CanCommitTransactionReply.YES);
139
140         ListenableFuture<Boolean> future = proxy.canCommit();
141
142         assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
143
144         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
145                 CanCommitTransactionReply.NO);
146
147         future = proxy.canCommit();
148
149         assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
150
151         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
152     }
153
154     @Test
155     public void testCanCommitWithMultipleCohorts() throws Exception {
156
157         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
158
159         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
160                 CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
161
162         ListenableFuture<Boolean> future = proxy.canCommit();
163
164         assertEquals("canCommit", true, future.get(5, TimeUnit.SECONDS));
165
166         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
167     }
168
169     @Test
170     public void testCanCommitWithMultipleCohortsAndOneFailure() throws Exception {
171
172         ThreePhaseCommitCohortProxy proxy = setupProxy(3);
173
174         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
175                 CanCommitTransactionReply.YES, CanCommitTransactionReply.NO, CanCommitTransactionReply.YES);
176
177         ListenableFuture<Boolean> future = proxy.canCommit();
178
179         assertEquals("canCommit", false, future.get(5, TimeUnit.SECONDS));
180
181         verifyCohortInvocations(3, CanCommitTransaction.SERIALIZABLE_CLASS);
182     }
183
184     @Test(expected = TestException.class)
185     public void testCanCommitWithExceptionFailure() throws Throwable {
186
187         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
188
189         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS, new TestException());
190
191         propagateExecutionExceptionCause(proxy.canCommit());
192     }
193
194     @Test(expected = ExecutionException.class)
195     public void testCanCommitWithInvalidResponseType() throws Exception {
196
197         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
198
199         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
200                 new PreCommitTransactionReply());
201
202         proxy.canCommit().get(5, TimeUnit.SECONDS);
203     }
204
205     @Test(expected = TestException.class)
206     public void testCanCommitWithFailedCohortPath() throws Throwable {
207
208         ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
209
210         try {
211             propagateExecutionExceptionCause(proxy.canCommit());
212         } finally {
213             verifyCohortInvocations(0, CanCommitTransaction.SERIALIZABLE_CLASS);
214         }
215     }
216
217     @Test
218     public void testPreCommit() throws Exception {
219         // Precommit is currently a no-op
220         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
221
222         setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
223                 new PreCommitTransactionReply());
224
225         proxy.preCommit().get(5, TimeUnit.SECONDS);
226     }
227
228     @Test
229     public void testAbort() throws Exception {
230         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
231
232         setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new AbortTransactionReply());
233
234         proxy.abort().get(5, TimeUnit.SECONDS);
235
236         verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
237     }
238
239     @Test
240     public void testAbortWithFailure() throws Exception {
241         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
242
243         setupMockActorContext(AbortTransaction.SERIALIZABLE_CLASS, new RuntimeException("mock"));
244
245         // The exception should not get propagated.
246         proxy.abort().get(5, TimeUnit.SECONDS);
247
248         verifyCohortInvocations(1, AbortTransaction.SERIALIZABLE_CLASS);
249     }
250
251     @Test
252     public void testAbortWithFailedCohortPath() throws Throwable {
253
254         ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
255
256         // The exception should not get propagated.
257         proxy.abort().get(5, TimeUnit.SECONDS);
258
259         verifyCohortInvocations(0, AbortTransaction.SERIALIZABLE_CLASS);
260     }
261
262     @Test
263     public void testCommit() throws Exception {
264
265         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
266
267         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
268                 new CommitTransactionReply());
269
270         proxy.commit().get(5, TimeUnit.SECONDS);
271
272         verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
273     }
274
275     @Test(expected = TestException.class)
276     public void testCommitWithFailure() throws Throwable {
277
278         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
279
280         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new CommitTransactionReply(),
281                 new TestException());
282
283         propagateExecutionExceptionCause(proxy.commit());
284     }
285
286     @Test(expected = ExecutionException.class)
287     public void testCommitWithInvalidResponseType() throws Exception {
288
289         ThreePhaseCommitCohortProxy proxy = setupProxy(1);
290
291         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS, new PreCommitTransactionReply());
292
293         proxy.commit().get(5, TimeUnit.SECONDS);
294     }
295
296     @Test(expected = TestException.class)
297     public void testCommitWithFailedCohortPath() throws Throwable {
298
299         ThreePhaseCommitCohortProxy proxy = setupProxyWithFailedCohortPath();
300
301         try {
302             propagateExecutionExceptionCause(proxy.commit());
303         } finally {
304
305             verify(actorContext, never()).setTxCreationLimit(anyLong());
306             verifyCohortInvocations(0, CommitTransaction.SERIALIZABLE_CLASS);
307         }
308
309     }
310
311     @Test
312     public void testAllThreePhasesSuccessful() throws Exception {
313
314         ThreePhaseCommitCohortProxy proxy = setupProxy(2);
315
316         setupMockActorContext(CanCommitTransaction.SERIALIZABLE_CLASS,
317                 CanCommitTransactionReply.YES, CanCommitTransactionReply.YES);
318
319         setupMockActorContext(PreCommitTransaction.SERIALIZABLE_CLASS,
320                 new PreCommitTransactionReply(), new PreCommitTransactionReply());
321
322         setupMockActorContext(CommitTransaction.SERIALIZABLE_CLASS,
323                 new CommitTransactionReply(), new CommitTransactionReply());
324
325         assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
326
327         proxy.canCommit().get(5, TimeUnit.SECONDS);
328         proxy.preCommit().get(5, TimeUnit.SECONDS);
329         proxy.commit().get(5, TimeUnit.SECONDS);
330
331         verifyCohortInvocations(2, CanCommitTransaction.SERIALIZABLE_CLASS);
332         verifyCohortInvocations(2, CommitTransaction.SERIALIZABLE_CLASS);
333
334         // Verify that the creation limit was changed to 0.5 (based on setup)
335         verify(actorContext, timeout(5000)).setTxCreationLimit(0.5);
336     }
337
338     @Test
339     public void testDoNotChangeTxCreationLimitWhenCommittingEmptyTxn() throws Exception {
340
341         ThreePhaseCommitCohortProxy proxy = setupProxy(0);
342
343         assertEquals(10.0, actorContext.getTxCreationLimit(), 1e-15);
344
345         proxy.canCommit().get(5, TimeUnit.SECONDS);
346         proxy.preCommit().get(5, TimeUnit.SECONDS);
347         proxy.commit().get(5, TimeUnit.SECONDS);
348
349         verify(actorContext, never()).setTxCreationLimit(anyLong());
350     }
351 }