Fixup checkstyle
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / databroker / actors / dds / ClientTransactionCommitCohortTest.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.Mockito.doReturn;
12 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.CLIENT_ID;
13 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.HISTORY_ID;
14 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.TRANSACTION_ID;
15 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertOperationThrowsException;
16 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.getWithTimeout;
17
18 import akka.actor.ActorSystem;
19 import akka.testkit.TestProbe;
20 import akka.testkit.javadsl.TestKit;
21 import com.google.common.primitives.UnsignedLong;
22 import com.google.common.util.concurrent.ListenableFuture;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.List;
26 import java.util.Optional;
27 import java.util.concurrent.ExecutionException;
28 import java.util.function.Consumer;
29 import java.util.function.Function;
30 import java.util.stream.Collectors;
31 import org.junit.After;
32 import org.junit.Before;
33 import org.junit.Test;
34 import org.junit.runner.RunWith;
35 import org.mockito.Mock;
36 import org.mockito.junit.MockitoJUnitRunner;
37 import org.opendaylight.controller.cluster.access.ABIVersion;
38 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
39 import org.opendaylight.controller.cluster.access.client.AccessClientUtil;
40 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
41 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
42 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
43 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
44 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
45 import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
46 import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
47 import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
48 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
49 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
50 import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
51 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
52 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
53 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
54 import org.opendaylight.mdsal.common.api.CommitInfo;
55 import org.opendaylight.yangtools.yang.common.Empty;
56
57 @RunWith(MockitoJUnitRunner.StrictStubs.class)
58 public class ClientTransactionCommitCohortTest {
59     private static final String PERSISTENCE_ID = "per-1";
60     private static final int TRANSACTIONS = 3;
61
62     private final List<TransactionTester<RemoteProxyTransaction>> transactions = new ArrayList<>();
63
64     @Mock
65     private AbstractClientHistory history;
66     @Mock
67     private DatastoreContext datastoreContext;
68     @Mock
69     private ActorUtils actorUtils;
70
71     private ActorSystem system;
72     private ClientTransactionCommitCohort cohort;
73
74     @Before
75     public void setUp() {
76         system = ActorSystem.apply();
77         final TestProbe clientContextProbe = new TestProbe(system, "clientContext");
78         final ClientActorContext context =
79                 AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
80         doReturn(1000).when(datastoreContext).getShardBatchedModificationCount();
81         doReturn(datastoreContext).when(actorUtils).getDatastoreContext();
82         doReturn(actorUtils).when(history).actorUtils();
83
84         for (int i = 0; i < TRANSACTIONS; i++) {
85             transactions.add(createTransactionTester(new TestProbe(system, "backend" + i), context, history));
86         }
87         final Collection<AbstractProxyTransaction> proxies = transactions.stream()
88                 .map(TransactionTester::getTransaction)
89                 .collect(Collectors.toList());
90         proxies.forEach(AbstractProxyTransaction::seal);
91         cohort = new ClientTransactionCommitCohort(history, TRANSACTION_ID, proxies);
92     }
93
94     @After
95     public void tearDown() {
96         TestKit.shutdownActorSystem(system);
97     }
98
99     @Test
100     public void testCanCommit() throws Exception {
101         testOpSuccess(ClientTransactionCommitCohort::canCommit, this::expectCanCommit,
102                 this::replyCanCommitSuccess, Boolean.TRUE);
103     }
104
105     @Test
106     public void testCanCommitFail() throws Exception {
107         testOpFail(ClientTransactionCommitCohort::canCommit, this::expectCanCommit, this::replyCanCommitSuccess);
108     }
109
110     @Test
111     public void testPreCommit() throws Exception {
112         testOpSuccess(ClientTransactionCommitCohort::preCommit, this::expectPreCommit, this::replyPreCommitSuccess,
113             Empty.value());
114     }
115
116     @Test
117     public void testPreCommitFail() throws Exception {
118         testOpFail(ClientTransactionCommitCohort::preCommit, this::expectPreCommit, this::replyPreCommitSuccess);
119     }
120
121     @Test
122     public void testCommit() throws Exception {
123         testOpSuccess(ClientTransactionCommitCohort::commit, this::expectCommit, this::replyCommitSuccess,
124             CommitInfo.empty());
125     }
126
127     @Test
128     public void testCommitFail() throws Exception {
129         testOpFail(ClientTransactionCommitCohort::commit, this::expectCommit, this::replyCommitSuccess);
130     }
131
132     @Test
133     public void testAbort() throws Exception {
134         testOpSuccess(ClientTransactionCommitCohort::abort, this::expectAbort, this::replyAbortSuccess, Empty.value());
135     }
136
137     @Test
138     public void testAbortFail() throws Exception {
139         testOpFail(ClientTransactionCommitCohort::abort, this::expectAbort, this::replyAbortSuccess);
140     }
141
142     private void expectCanCommit(final TransactionTester<RemoteProxyTransaction> tester) {
143         final ModifyTransactionRequest request = tester.expectTransactionRequest(ModifyTransactionRequest.class);
144         assertEquals(Optional.of(PersistenceProtocol.THREE_PHASE), request.getPersistenceProtocol());
145     }
146
147     void expectPreCommit(final TransactionTester<?> tester) {
148         tester.expectTransactionRequest(TransactionPreCommitRequest.class);
149     }
150
151     void expectCommit(final TransactionTester<?> tester) {
152         tester.expectTransactionRequest(TransactionDoCommitRequest.class);
153     }
154
155     void expectAbort(final TransactionTester<?> tester) {
156         tester.expectTransactionRequest(TransactionAbortRequest.class);
157     }
158
159     void replyCanCommitSuccess(final TransactionTester<?> tester) {
160         final RequestSuccess<?, ?> success = new TransactionCanCommitSuccess(tester.getTransaction().getIdentifier(),
161                 tester.getLastReceivedMessage().getSequence());
162         tester.replySuccess(success);
163     }
164
165     void replyPreCommitSuccess(final TransactionTester<?> tester) {
166         final RequestSuccess<?, ?> success = new TransactionPreCommitSuccess(tester.getTransaction().getIdentifier(),
167                 tester.getLastReceivedMessage().getSequence());
168         tester.replySuccess(success);
169     }
170
171     void replyCommitSuccess(final TransactionTester<?> tester) {
172         final RequestSuccess<?, ?> success = new TransactionCommitSuccess(tester.getTransaction().getIdentifier(),
173                 tester.getLastReceivedMessage().getSequence());
174         tester.replySuccess(success);
175     }
176
177     void replyAbortSuccess(final TransactionTester<?> tester) {
178         final RequestSuccess<?, ?> success = new TransactionAbortSuccess(tester.getTransaction().getIdentifier(),
179                 tester.getLastReceivedMessage().getSequence());
180         tester.replySuccess(success);
181     }
182
183     private static TransactionTester<RemoteProxyTransaction> createTransactionTester(final TestProbe backendProbe,
184                                                              final ClientActorContext context,
185                                                              final AbstractClientHistory history) {
186         final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.current(),
187                 "default", UnsignedLong.ZERO, Optional.empty(), 3);
188         final AbstractClientConnection<ShardBackendInfo> connection =
189                 AccessClientUtil.createConnectedConnection(context, 0L, backend);
190         final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID);
191         final RemoteProxyTransaction transaction =
192                 new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false, false);
193         return new TransactionTester<>(transaction, connection, backendProbe);
194     }
195
196     private static <T extends TransactionTester<?>> void replySuccess(final Collection<T> transactions,
197                               final Consumer<T> expect, final Consumer<T> reply) {
198         for (final T transaction : transactions) {
199             expect.accept(transaction);
200             reply.accept(transaction);
201         }
202     }
203
204     /**
205      * Test operation success. Invokes given operation, which initiates message to the backend.
206      * Received message is checked by expectFunction. Then replyFunction is invoked. Expected result is compared
207      * to the operation future result.
208      *
209      * @param operation      operation
210      * @param expectFunction expected message check
211      * @param replyFunction  response function
212      * @param expectedResult expected operation result
213      * @param <T>            type
214      * @throws Exception unexpected exception
215      */
216     private <T> void testOpSuccess(final Function<ClientTransactionCommitCohort, ListenableFuture<T>> operation,
217                                    final Consumer<TransactionTester<RemoteProxyTransaction>> expectFunction,
218                                    final Consumer<TransactionTester<RemoteProxyTransaction>> replyFunction,
219                                    final T expectedResult) throws Exception {
220         final ListenableFuture<T> result = operation.apply(cohort);
221         replySuccess(transactions, expectFunction, replyFunction);
222         assertEquals(expectedResult, getWithTimeout(result));
223     }
224
225     /**
226      * Test operation failure. Invokes given operation, which initiates message to the backend.
227      * Received message is checked by expectFunction. Then replyFunction is invoked. One of the transactions in
228      * cohort receives failure response.
229      *
230      * @param operation      operation
231      * @param expectFunction expected message check
232      * @param replyFunction  response function
233      * @param <T>            type
234      * @throws Exception unexpected exception
235      */
236     private <T> void testOpFail(final Function<ClientTransactionCommitCohort, ListenableFuture<T>> operation,
237             final Consumer<TransactionTester<RemoteProxyTransaction>> expectFunction,
238             final Consumer<TransactionTester<RemoteProxyTransaction>> replyFunction) throws Exception {
239         final ListenableFuture<T> canCommit = operation.apply(cohort);
240         //reply success to all except last transaction
241         replySuccess(transactions.subList(0, transactions.size() - 1), expectFunction, replyFunction);
242         //reply fail to last transaction
243         final TransactionTester<RemoteProxyTransaction> last = transactions.get(transactions.size() - 1);
244         expectFunction.accept(last);
245         final RuntimeException e = new RuntimeException();
246         final RuntimeRequestException cause = new RuntimeRequestException("fail", e);
247         last.replyFailure(cause);
248         //check future fail
249         final ExecutionException exception =
250                 assertOperationThrowsException(() -> getWithTimeout(canCommit), ExecutionException.class);
251         assertEquals(e, exception.getCause());
252     }
253
254 }