Use Empty instead of Void in cohorts
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / databroker / actors / dds / ClientTransactionCommitCohortTest.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.CLIENT_ID;
12 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.HISTORY_ID;
13 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.TRANSACTION_ID;
14 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertOperationThrowsException;
15 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.getWithTimeout;
16
17 import akka.actor.ActorSystem;
18 import akka.testkit.TestProbe;
19 import akka.testkit.javadsl.TestKit;
20 import com.google.common.primitives.UnsignedLong;
21 import com.google.common.util.concurrent.ListenableFuture;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.List;
25 import java.util.Optional;
26 import java.util.concurrent.ExecutionException;
27 import java.util.function.Consumer;
28 import java.util.function.Function;
29 import java.util.stream.Collectors;
30 import org.junit.After;
31 import org.junit.Before;
32 import org.junit.Test;
33 import org.junit.runner.RunWith;
34 import org.mockito.Mock;
35 import org.mockito.junit.MockitoJUnitRunner;
36 import org.opendaylight.controller.cluster.access.ABIVersion;
37 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
38 import org.opendaylight.controller.cluster.access.client.AccessClientUtil;
39 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
40 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
41 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
42 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
43 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
44 import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
45 import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
46 import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
47 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
48 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
49 import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
50 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
51 import org.opendaylight.mdsal.common.api.CommitInfo;
52 import org.opendaylight.yangtools.yang.common.Empty;
53
54 @RunWith(MockitoJUnitRunner.StrictStubs.class)
55 public class ClientTransactionCommitCohortTest {
56
57     private static final String PERSISTENCE_ID = "per-1";
58     private static final int TRANSACTIONS = 3;
59
60     @Mock
61     private AbstractClientHistory history;
62     private ActorSystem system;
63     private List<TransactionTester<RemoteProxyTransaction>> transactions;
64     private ClientTransactionCommitCohort cohort;
65
66     @Before
67     public void setUp() {
68         system = ActorSystem.apply();
69         final TestProbe clientContextProbe = new TestProbe(system, "clientContext");
70         final ClientActorContext context =
71                 AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
72         transactions = new ArrayList<>();
73         for (int i = 0; i < TRANSACTIONS; i++) {
74             transactions.add(createTransactionTester(new TestProbe(system, "backend" + i), context, history));
75         }
76         final Collection<AbstractProxyTransaction> proxies = transactions.stream()
77                 .map(TransactionTester::getTransaction)
78                 .collect(Collectors.toList());
79         proxies.forEach(AbstractProxyTransaction::seal);
80         cohort = new ClientTransactionCommitCohort(history, TRANSACTION_ID, proxies);
81     }
82
83     @After
84     public void tearDown() {
85         TestKit.shutdownActorSystem(system);
86     }
87
88     @Test
89     public void testCanCommit() throws Exception {
90         testOpSuccess(ClientTransactionCommitCohort::canCommit, this::expectCanCommit,
91                 this::replyCanCommitSuccess, Boolean.TRUE);
92     }
93
94     @Test
95     public void testCanCommitFail() throws Exception {
96         testOpFail(ClientTransactionCommitCohort::canCommit, this::expectCanCommit, this::replyCanCommitSuccess);
97     }
98
99     @Test
100     public void testPreCommit() throws Exception {
101         testOpSuccess(ClientTransactionCommitCohort::preCommit, this::expectPreCommit, this::replyPreCommitSuccess,
102             Empty.value());
103     }
104
105     @Test
106     public void testPreCommitFail() throws Exception {
107         testOpFail(ClientTransactionCommitCohort::preCommit, this::expectPreCommit, this::replyPreCommitSuccess);
108     }
109
110     @Test
111     public void testCommit() throws Exception {
112         testOpSuccess(ClientTransactionCommitCohort::commit, this::expectCommit, this::replyCommitSuccess,
113             CommitInfo.empty());
114     }
115
116     @Test
117     public void testCommitFail() throws Exception {
118         testOpFail(ClientTransactionCommitCohort::commit, this::expectCommit, this::replyCommitSuccess);
119     }
120
121     @Test
122     public void testAbort() throws Exception {
123         testOpSuccess(ClientTransactionCommitCohort::abort, this::expectAbort, this::replyAbortSuccess, Empty.value());
124     }
125
126     @Test
127     public void testAbortFail() throws Exception {
128         testOpFail(ClientTransactionCommitCohort::abort, this::expectAbort, this::replyAbortSuccess);
129     }
130
131     private void expectCanCommit(final TransactionTester<RemoteProxyTransaction> tester) {
132         final ModifyTransactionRequest request = tester.expectTransactionRequest(ModifyTransactionRequest.class);
133         assertEquals(Optional.of(PersistenceProtocol.THREE_PHASE), request.getPersistenceProtocol());
134     }
135
136     void expectPreCommit(final TransactionTester<?> tester) {
137         tester.expectTransactionRequest(TransactionPreCommitRequest.class);
138     }
139
140     void expectCommit(final TransactionTester<?> tester) {
141         tester.expectTransactionRequest(TransactionDoCommitRequest.class);
142     }
143
144     void expectAbort(final TransactionTester<?> tester) {
145         tester.expectTransactionRequest(TransactionAbortRequest.class);
146     }
147
148     void replyCanCommitSuccess(final TransactionTester<?> tester) {
149         final RequestSuccess<?, ?> success = new TransactionCanCommitSuccess(tester.getTransaction().getIdentifier(),
150                 tester.getLastReceivedMessage().getSequence());
151         tester.replySuccess(success);
152     }
153
154     void replyPreCommitSuccess(final TransactionTester<?> tester) {
155         final RequestSuccess<?, ?> success = new TransactionPreCommitSuccess(tester.getTransaction().getIdentifier(),
156                 tester.getLastReceivedMessage().getSequence());
157         tester.replySuccess(success);
158     }
159
160     void replyCommitSuccess(final TransactionTester<?> tester) {
161         final RequestSuccess<?, ?> success = new TransactionCommitSuccess(tester.getTransaction().getIdentifier(),
162                 tester.getLastReceivedMessage().getSequence());
163         tester.replySuccess(success);
164     }
165
166     void replyAbortSuccess(final TransactionTester<?> tester) {
167         final RequestSuccess<?, ?> success = new TransactionAbortSuccess(tester.getTransaction().getIdentifier(),
168                 tester.getLastReceivedMessage().getSequence());
169         tester.replySuccess(success);
170     }
171
172     private static TransactionTester<RemoteProxyTransaction> createTransactionTester(final TestProbe backendProbe,
173                                                              final ClientActorContext context,
174                                                              final AbstractClientHistory history) {
175         final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
176                 "default", UnsignedLong.ZERO, Optional.empty(), 3);
177         final AbstractClientConnection<ShardBackendInfo> connection =
178                 AccessClientUtil.createConnectedConnection(context, 0L, backend);
179         final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID);
180         final RemoteProxyTransaction transaction =
181                 new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false, false);
182         return new TransactionTester<>(transaction, connection, backendProbe);
183     }
184
185     private static <T extends TransactionTester<?>> void replySuccess(final Collection<T> transactions,
186                               final Consumer<T> expect, final Consumer<T> reply) {
187         for (final T transaction : transactions) {
188             expect.accept(transaction);
189             reply.accept(transaction);
190         }
191     }
192
193     /**
194      * Test operation success. Invokes given operation, which initiates message to the backend.
195      * Received message is checked by expectFunction. Then replyFunction is invoked. Expected result is compared
196      * to the operation future result.
197      *
198      * @param operation      operation
199      * @param expectFunction expected message check
200      * @param replyFunction  response function
201      * @param expectedResult expected operation result
202      * @param <T>            type
203      * @throws Exception unexpected exception
204      */
205     private <T> void testOpSuccess(final Function<ClientTransactionCommitCohort, ListenableFuture<T>> operation,
206                                    final Consumer<TransactionTester<RemoteProxyTransaction>> expectFunction,
207                                    final Consumer<TransactionTester<RemoteProxyTransaction>> replyFunction,
208                                    final T expectedResult) throws Exception {
209         final ListenableFuture<T> result = operation.apply(cohort);
210         replySuccess(transactions, expectFunction, replyFunction);
211         assertEquals(expectedResult, getWithTimeout(result));
212     }
213
214     /**
215      * Test operation failure. Invokes given operation, which initiates message to the backend.
216      * Received message is checked by expectFunction. Then replyFunction is invoked. One of the transactions in
217      * cohort receives failure response.
218      *
219      * @param operation      operation
220      * @param expectFunction expected message check
221      * @param replyFunction  response function
222      * @param <T>            type
223      * @throws Exception unexpected exception
224      */
225     private <T> void testOpFail(final Function<ClientTransactionCommitCohort, ListenableFuture<T>> operation,
226             final Consumer<TransactionTester<RemoteProxyTransaction>> expectFunction,
227             final Consumer<TransactionTester<RemoteProxyTransaction>> replyFunction) throws Exception {
228         final ListenableFuture<T> canCommit = operation.apply(cohort);
229         //reply success to all except last transaction
230         replySuccess(transactions.subList(0, transactions.size() - 1), expectFunction, replyFunction);
231         //reply fail to last transaction
232         final TransactionTester<RemoteProxyTransaction> last = transactions.get(transactions.size() - 1);
233         expectFunction.accept(last);
234         final RuntimeException e = new RuntimeException();
235         final RuntimeRequestException cause = new RuntimeRequestException("fail", e);
236         last.replyFailure(cause);
237         //check future fail
238         final ExecutionException exception =
239                 assertOperationThrowsException(() -> getWithTimeout(canCommit), ExecutionException.class);
240         assertEquals(e, exception.getCause());
241     }
242
243 }