BUG-5280: remove support for talking to pre-Boron clients
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ThreePhaseCommitCohortProxyTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.fail;
14 import static org.mockito.Mockito.doReturn;
15 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
16 import akka.actor.ActorSelection;
17 import akka.actor.Props;
18 import akka.actor.UntypedActor;
19 import akka.dispatch.Dispatchers;
20 import akka.dispatch.Futures;
21 import akka.testkit.TestActorRef;
22 import com.codahale.metrics.Snapshot;
23 import com.codahale.metrics.Timer;
24 import com.google.common.base.Supplier;
25 import com.google.common.util.concurrent.ListenableFuture;
26 import java.util.ArrayList;
27 import java.util.Arrays;
28 import java.util.Collections;
29 import java.util.List;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import org.junit.Before;
34 import org.junit.Test;
35 import org.mockito.Mock;
36 import org.mockito.MockitoAnnotations;
37 import org.opendaylight.controller.cluster.datastore.ThreePhaseCommitCohortProxy.CohortInfo;
38 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
39 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
40 import org.opendaylight.controller.cluster.datastore.messages.AbstractThreePhaseCommitMessage;
41 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
42 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
43 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
44 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
45 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
46 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
47 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
48 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
49 import org.opendaylight.controller.cluster.raft.TestActorFactory;
50 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
51
52 public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
53
54     @SuppressWarnings("serial")
55     static class TestException extends RuntimeException {
56     }
57
58     private ActorContext actorContext;
59
60     @Mock
61     private Timer commitTimer;
62
63     @Mock
64     private Timer.Context commitTimerContext;
65
66     @Mock
67     private Snapshot commitSnapshot;
68
69     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
70     private final List<TestActorRef<CohortActor>> cohortActors = new ArrayList<>();
71
72     @Before
73     public void setUp() {
74         MockitoAnnotations.initMocks(this);
75
76         actorContext = new ActorContext(getSystem(), actorFactory.createActor(Props.create(DoNothingActor.class)),
77                 new MockClusterWrapper(), new MockConfiguration(),
78                 DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache()) {
79                     @Override
80                     public Timer getOperationTimer(String operationName) {
81                         return commitTimer;
82                     }
83
84                     @Override
85                     public double getTxCreationLimit() {
86                         return 10.0;
87                     }
88                 };
89
90         doReturn(commitTimerContext).when(commitTimer).time();
91         doReturn(commitSnapshot).when(commitTimer).getSnapshot();
92         for(int i=1;i<11;i++){
93             // Keep on increasing the amount of time it takes to complete transaction for each tenth of a
94             // percentile. Essentially this would be 1ms for the 10th percentile, 2ms for 20th percentile and so on.
95             doReturn(TimeUnit.MILLISECONDS.toNanos(i) * 1D).when(commitSnapshot).getValue(i * 0.1);
96         }
97     }
98
99     @Test
100     public void testCanCommitYesWithOneCohort() throws Exception {
101         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
102                 newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
103                         CanCommitTransactionReply.yes(CURRENT_VERSION)))), "txn-1");
104
105         verifyCanCommit(proxy.canCommit(), true);
106         verifyCohortActors();
107     }
108
109     @Test
110     public void testCanCommitNoWithOneCohort() throws Exception {
111         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
112                 newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
113                         CanCommitTransactionReply.no(CURRENT_VERSION)))), "txn-1");
114
115         verifyCanCommit(proxy.canCommit(), false);
116         verifyCohortActors();
117     }
118
119     @Test
120     public void testCanCommitYesWithTwoCohorts() throws Exception {
121         List<CohortInfo> cohorts = Arrays.asList(
122                 newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
123                         CanCommitTransactionReply.yes(CURRENT_VERSION))),
124                 newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
125                         CanCommitTransactionReply.yes(CURRENT_VERSION))));
126         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
127
128         verifyCanCommit(proxy.canCommit(), true);
129         verifyCohortActors();
130     }
131
132     @Test
133     public void testCanCommitNoWithThreeCohorts() throws Exception {
134         List<CohortInfo> cohorts = Arrays.asList(
135                 newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
136                         CanCommitTransactionReply.yes(CURRENT_VERSION))),
137                 newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(
138                         CanCommitTransactionReply.no(CURRENT_VERSION))),
139                 newCohortInfo(new CohortActor.Builder("txn-1")));
140         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
141
142         verifyCanCommit(proxy.canCommit(), false);
143         verifyCohortActors();
144     }
145
146     @Test(expected = TestException.class)
147     public void testCanCommitWithExceptionFailure() throws Throwable {
148         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
149                 newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit(new TestException()))), "txn-1");
150
151         propagateExecutionExceptionCause(proxy.canCommit());
152     }
153
154     @Test(expected = IllegalArgumentException.class)
155     public void testCanCommitWithInvalidResponseType() throws Throwable {
156         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
157                 newCohortInfo(new CohortActor.Builder("txn-1").expectCanCommit("invalid"))), "txn-1");
158
159         propagateExecutionExceptionCause(proxy.canCommit());
160     }
161
162     @Test(expected = TestException.class)
163     public void testCanCommitWithFailedCohortFuture() throws Throwable {
164         List<CohortInfo> cohorts = Arrays.asList(
165                 newCohortInfo(new CohortActor.Builder("txn-1")),
166                 newCohortInfoWithFailedFuture(new TestException()),
167                 newCohortInfo(new CohortActor.Builder("txn-1")));
168         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
169
170         propagateExecutionExceptionCause(proxy.canCommit());
171     }
172
173     @Test
174     public void testAllThreePhasesSuccessful() throws Exception {
175         List<CohortInfo> cohorts = Arrays.asList(
176                 newCohortInfo(new CohortActor.Builder("txn-1").
177                         expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
178                         expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
179                 newCohortInfo(new CohortActor.Builder("txn-1").
180                         expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
181                         expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))));
182         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
183
184         verifyCanCommit(proxy.canCommit(), true);
185         verifySuccessfulFuture(proxy.preCommit());
186         verifySuccessfulFuture(proxy.commit());
187         verifyCohortActors();
188     }
189
190     @Test(expected = TestException.class)
191     public void testCommitWithExceptionFailure() throws Throwable {
192         List<CohortInfo> cohorts = Arrays.asList(
193                 newCohortInfo(new CohortActor.Builder("txn-1").
194                         expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
195                         expectCommit(CommitTransactionReply.instance(CURRENT_VERSION))),
196                 newCohortInfo(new CohortActor.Builder("txn-1").
197                         expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
198                         expectCommit(new TestException())));
199         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
200
201         verifyCanCommit(proxy.canCommit(), true);
202         verifySuccessfulFuture(proxy.preCommit());
203         propagateExecutionExceptionCause(proxy.commit());
204     }
205
206     @Test(expected = IllegalArgumentException.class)
207     public void testCommitWithInvalidResponseType() throws Throwable {
208         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
209                 newCohortInfo(new CohortActor.Builder("txn-1").
210                         expectCanCommit(CanCommitTransactionReply.yes(CURRENT_VERSION)).
211                         expectCommit("invalid"))), "txn-1");
212
213         verifyCanCommit(proxy.canCommit(), true);
214         verifySuccessfulFuture(proxy.preCommit());
215         propagateExecutionExceptionCause(proxy.commit());
216     }
217
218     @Test
219     public void testAbort() throws Exception {
220         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
221                 newCohortInfo(new CohortActor.Builder("txn-1").expectAbort(
222                         AbortTransactionReply.instance(CURRENT_VERSION)))), "txn-1");
223
224         verifySuccessfulFuture(proxy.abort());
225         verifyCohortActors();
226     }
227
228     @Test
229     public void testAbortWithFailure() throws Exception {
230         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, Arrays.asList(
231                 newCohortInfo(new CohortActor.Builder("txn-1").expectAbort(new RuntimeException("mock")))), "txn-1");
232
233         // The exception should not get propagated.
234         verifySuccessfulFuture(proxy.abort());
235         verifyCohortActors();
236     }
237
238     @Test
239     public void testAbortWithFailedCohortFuture() throws Throwable {
240         List<CohortInfo> cohorts = Arrays.asList(
241                 newCohortInfoWithFailedFuture(new TestException()),
242                 newCohortInfo(new CohortActor.Builder("txn-1")));
243         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext, cohorts, "txn-1");
244
245         verifySuccessfulFuture(proxy.abort());
246         verifyCohortActors();
247     }
248
249     @Test
250     public void testWithNoCohorts() throws Exception {
251         ThreePhaseCommitCohortProxy proxy = new ThreePhaseCommitCohortProxy(actorContext,
252                 Collections.<CohortInfo>emptyList(), "txn-1");
253
254         verifyCanCommit(proxy.canCommit(), true);
255         verifySuccessfulFuture(proxy.preCommit());
256         verifySuccessfulFuture(proxy.commit());
257         verifyCohortActors();
258     }
259
260     private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Throwable {
261
262         try {
263             future.get(5, TimeUnit.SECONDS);
264             fail("Expected ExecutionException");
265         } catch(ExecutionException e) {
266             verifyCohortActors();
267             throw e.getCause();
268         }
269     }
270
271     private CohortInfo newCohortInfo(CohortActor.Builder builder, final short version) {
272         TestActorRef<CohortActor> actor = actorFactory.createTestActor(builder.props().
273                 withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("cohort"));
274         cohortActors.add(actor);
275         return new CohortInfo(Futures.successful(getSystem().actorSelection(actor.path())), new Supplier<Short>() {
276             @Override
277             public Short get() {
278                 return version;
279             }
280         });
281     }
282
283     private static CohortInfo newCohortInfoWithFailedFuture(Exception failure) {
284         return new CohortInfo(Futures.<ActorSelection>failed(failure), new Supplier<Short>() {
285             @Override
286             public Short get() {
287                 return CURRENT_VERSION;
288             }
289         });
290     }
291
292     private CohortInfo newCohortInfo(CohortActor.Builder builder) {
293         return newCohortInfo(builder, CURRENT_VERSION);
294     }
295
296     private void verifyCohortActors() {
297         for(TestActorRef<CohortActor> actor: cohortActors) {
298             actor.underlyingActor().verify();
299         }
300     }
301
302     private <T> T verifySuccessfulFuture(ListenableFuture<T> future) throws Exception {
303         try {
304             return future.get(5, TimeUnit.SECONDS);
305         } catch(Exception e) {
306             verifyCohortActors();
307             throw e;
308         }
309     }
310
311     private void verifyCanCommit(ListenableFuture<Boolean> future, boolean expected) throws Exception {
312         Boolean actual = verifySuccessfulFuture(future);
313         assertEquals("canCommit", expected, actual);
314     }
315
316     private static class CohortActor extends UntypedActor {
317         private final Builder builder;
318         private final AtomicInteger canCommitCount = new AtomicInteger();
319         private final AtomicInteger commitCount = new AtomicInteger();
320         private final AtomicInteger abortCount = new AtomicInteger();
321         private volatile AssertionError assertionError;
322
323         private CohortActor(Builder builder) {
324             this.builder = builder;
325         }
326
327         @Override
328         public void onReceive(Object message) {
329             if(CanCommitTransaction.isSerializedType(message)) {
330                 canCommitCount.incrementAndGet();
331                 onMessage("CanCommitTransaction", message, CanCommitTransaction.fromSerializable(message),
332                         builder.expCanCommitType, builder.canCommitReply);
333             } else if(CommitTransaction.isSerializedType(message)) {
334                 commitCount.incrementAndGet();
335                 onMessage("CommitTransaction", message, CommitTransaction.fromSerializable(message),
336                         builder.expCommitType, builder.commitReply);
337             } else if(AbortTransaction.isSerializedType(message)) {
338                 abortCount.incrementAndGet();
339                 onMessage("AbortTransaction", message, AbortTransaction.fromSerializable(message),
340                         builder.expAbortType, builder.abortReply);
341             } else {
342                 assertionError = new AssertionError("Unexpected message " + message);
343             }
344         }
345
346         private void onMessage(String name, Object rawMessage, AbstractThreePhaseCommitMessage actualMessage,
347                 Class<?> expType, Object reply) {
348             try {
349                 assertNotNull("Unexpected " + name, expType);
350                 assertEquals(name + " type", expType, rawMessage.getClass());
351                 assertEquals(name + " transactionId", builder.transactionId, actualMessage.getTransactionID());
352
353                 if(reply instanceof Throwable) {
354                     getSender().tell(new akka.actor.Status.Failure((Throwable)reply), self());
355                 } else {
356                     getSender().tell(reply, self());
357                 }
358             } catch(AssertionError e) {
359                 assertionError = e;
360             }
361         }
362
363         void verify() {
364             if(assertionError != null) {
365                 throw assertionError;
366             }
367
368             if(builder.expCanCommitType != null) {
369                 assertEquals("CanCommitTransaction count", 1, canCommitCount.get());
370             }
371
372             if(builder.expCommitType != null) {
373                 assertEquals("CommitTransaction count", 1, commitCount.get());
374             }
375
376             if(builder.expAbortType != null) {
377                 assertEquals("AbortTransaction count", 1, abortCount.get());
378             }
379         }
380
381         static class Builder {
382             private Class<?> expCanCommitType;
383             private Class<?> expCommitType;
384             private Class<?> expAbortType;
385             private Object canCommitReply;
386             private Object commitReply;
387             private Object abortReply;
388             private final String transactionId;
389
390             Builder(String transactionId) {
391                 this.transactionId = transactionId;
392             }
393
394             Builder expectCanCommit(Class<?> expCanCommitType, Object canCommitReply) {
395                 this.expCanCommitType = expCanCommitType;
396                 this.canCommitReply = canCommitReply;
397                 return this;
398             }
399
400             Builder expectCanCommit(Object canCommitReply) {
401                 return expectCanCommit(CanCommitTransaction.class, canCommitReply);
402             }
403
404             Builder expectCommit(Class<?> expCommitType, Object commitReply) {
405                 this.expCommitType = expCommitType;
406                 this.commitReply = commitReply;
407                 return this;
408             }
409
410             Builder expectCommit(Object commitReply) {
411                 return expectCommit(CommitTransaction.class, commitReply);
412             }
413
414             Builder expectAbort(Class<?> expAbortType, Object abortReply) {
415                 this.expAbortType = expAbortType;
416                 this.abortReply = abortReply;
417                 return this;
418             }
419
420             Builder expectAbort(Object abortReply) {
421                 return expectAbort(AbortTransaction.class, abortReply);
422             }
423
424             Props props() {
425                 return Props.create(CohortActor.class, this);
426             }
427         }
428     }
429 }