f9eb6ed6aff1b260e53341a5b454a6a53466c943
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / databroker / actors / dds / ClientTransactionCommitCohortTest.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
9
10 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.CLIENT_ID;
11 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.HISTORY_ID;
12 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.TRANSACTION_ID;
13 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertOperationThrowsException;
14 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.getWithTimeout;
15
16 import akka.actor.ActorSystem;
17 import akka.testkit.JavaTestKit;
18 import akka.testkit.TestProbe;
19 import com.google.common.primitives.UnsignedLong;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.List;
24 import java.util.Optional;
25 import java.util.concurrent.ExecutionException;
26 import java.util.function.Consumer;
27 import java.util.function.Function;
28 import java.util.stream.Collectors;
29 import org.junit.After;
30 import org.junit.Assert;
31 import org.junit.Before;
32 import org.junit.Test;
33 import org.mockito.Mock;
34 import org.mockito.MockitoAnnotations;
35 import org.opendaylight.controller.cluster.access.ABIVersion;
36 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
37 import org.opendaylight.controller.cluster.access.client.AccessClientUtil;
38 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
39 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
40 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
41 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
42 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
43 import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
44 import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
45 import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
46 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
47 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
48 import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
49 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
50
51 public class ClientTransactionCommitCohortTest {
52
53     private static final String PERSISTENCE_ID = "per-1";
54     private static final int TRANSACTIONS = 3;
55
56     @Mock
57     private AbstractClientHistory history;
58     private ActorSystem system;
59     private List<TransactionTester<RemoteProxyTransaction>> transactions;
60     private ClientTransactionCommitCohort cohort;
61
62     @Before
63     public void setUp() throws Exception {
64         MockitoAnnotations.initMocks(this);
65         system = ActorSystem.apply();
66         final TestProbe clientContextProbe = new TestProbe(system, "clientContext");
67         final ClientActorContext context =
68                 AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
69         transactions = new ArrayList<>();
70         for (int i = 0; i < TRANSACTIONS; i++) {
71             transactions.add(createTransactionTester(new TestProbe(system, "backend" + i), context, history));
72         }
73         final Collection<AbstractProxyTransaction> proxies = transactions.stream()
74                 .map(TransactionTester::getTransaction)
75                 .collect(Collectors.toList());
76         proxies.forEach(AbstractProxyTransaction::seal);
77         cohort = new ClientTransactionCommitCohort(history, TRANSACTION_ID, proxies);
78     }
79
80     @After
81     public void tearDown() throws Exception {
82         JavaTestKit.shutdownActorSystem(system);
83     }
84
85     @Test
86     public void testCanCommit() throws Exception {
87         testOpSuccess(ClientTransactionCommitCohort::canCommit, this::expectCanCommit,
88                 this::replyCanCommitSuccess, true);
89     }
90
91     @Test
92     public void testCanCommitFail() throws Exception {
93         testOpFail(ClientTransactionCommitCohort::canCommit, this::expectCanCommit, this::replyCanCommitSuccess);
94     }
95
96     @Test
97     public void testPreCommit() throws Exception {
98         testOpSuccess(ClientTransactionCommitCohort::preCommit, this::expectPreCommit, this::replyPreCommitSuccess,
99                 null);
100     }
101
102     @Test
103     public void testPreCommitFail() throws Exception {
104         testOpFail(ClientTransactionCommitCohort::preCommit, this::expectPreCommit, this::replyPreCommitSuccess);
105     }
106
107     @Test
108     public void testCommit() throws Exception {
109         testOpSuccess(ClientTransactionCommitCohort::commit, this::expectCommit, this::replyCommitSuccess, null);
110     }
111
112     @Test
113     public void testCommitFail() throws Exception {
114         testOpFail(ClientTransactionCommitCohort::commit, this::expectCommit, this::replyCommitSuccess);
115     }
116
117     @Test
118     public void testAbort() throws Exception {
119         testOpSuccess(ClientTransactionCommitCohort::abort, this::expectAbort, this::replyAbortSuccess, null);
120     }
121
122     @Test
123     public void testAbortFail() throws Exception {
124         testOpFail(ClientTransactionCommitCohort::abort, this::expectAbort, this::replyAbortSuccess);
125     }
126
127     private void expectCanCommit(final TransactionTester<RemoteProxyTransaction> tester) {
128         final ModifyTransactionRequest request = tester.expectTransactionRequest(ModifyTransactionRequest.class);
129         Assert.assertTrue(request.getPersistenceProtocol().isPresent());
130         Assert.assertEquals(PersistenceProtocol.THREE_PHASE, request.getPersistenceProtocol().get());
131     }
132
133     void expectPreCommit(final TransactionTester<?> tester) {
134         tester.expectTransactionRequest(TransactionPreCommitRequest.class);
135     }
136
137     void expectCommit(final TransactionTester<?> tester) {
138         tester.expectTransactionRequest(TransactionDoCommitRequest.class);
139     }
140
141     void expectAbort(final TransactionTester<?> tester) {
142         tester.expectTransactionRequest(TransactionAbortRequest.class);
143     }
144
145     void replyCanCommitSuccess(final TransactionTester<?> tester) {
146         final RequestSuccess<?, ?> success = new TransactionCanCommitSuccess(tester.getTransaction().getIdentifier(),
147                 tester.getLastReceivedMessage().getSequence());
148         tester.replySuccess(success);
149     }
150
151     void replyPreCommitSuccess(final TransactionTester<?> tester) {
152         final RequestSuccess<?, ?> success = new TransactionPreCommitSuccess(tester.getTransaction().getIdentifier(),
153                 tester.getLastReceivedMessage().getSequence());
154         tester.replySuccess(success);
155     }
156
157     void replyCommitSuccess(final TransactionTester<?> tester) {
158         final RequestSuccess<?, ?> success = new TransactionCommitSuccess(tester.getTransaction().getIdentifier(),
159                 tester.getLastReceivedMessage().getSequence());
160         tester.replySuccess(success);
161     }
162
163     void replyAbortSuccess(final TransactionTester<?> tester) {
164         final RequestSuccess<?, ?> success = new TransactionAbortSuccess(tester.getTransaction().getIdentifier(),
165                 tester.getLastReceivedMessage().getSequence());
166         tester.replySuccess(success);
167     }
168
169     private static TransactionTester<RemoteProxyTransaction> createTransactionTester(final TestProbe backendProbe,
170                                                              final ClientActorContext context,
171                                                              final AbstractClientHistory history) {
172         final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
173                 "default", UnsignedLong.ZERO, Optional.empty(), 3);
174         final AbstractClientConnection<ShardBackendInfo> connection =
175                 AccessClientUtil.createConnectedConnection(context, 0L, backend);
176         final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID);
177         final RemoteProxyTransaction transaction =
178                 new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false);
179         return new TransactionTester<>(transaction, connection, backendProbe);
180     }
181
182     private static <T extends TransactionTester<?>> void replySuccess(final Collection<T> transactions,
183                               final Consumer<T> expect, final Consumer<T> reply) {
184         for (final T transaction : transactions) {
185             expect.accept(transaction);
186             reply.accept(transaction);
187         }
188     }
189
190     /**
191      * Test operation success. Invokes given operation, which initiates message to the backend.
192      * Received message is checked by expectFunction. Then replyFunction is invoked. Expected result is compared
193      * to the operation future result.
194      *
195      * @param operation      operation
196      * @param expectFunction expected message check
197      * @param replyFunction  response function
198      * @param expectedResult expected operation result
199      * @param <T>            type
200      * @throws Exception unexpected exception
201      */
202     private <T> void testOpSuccess(final Function<ClientTransactionCommitCohort, ListenableFuture<T>> operation,
203                                    final Consumer<TransactionTester<RemoteProxyTransaction>> expectFunction,
204                                    final Consumer<TransactionTester<RemoteProxyTransaction>> replyFunction,
205                                    final T expectedResult) throws Exception {
206         final ListenableFuture<T> result = operation.apply(cohort);
207         replySuccess(transactions, expectFunction, replyFunction);
208         Assert.assertEquals(expectedResult, getWithTimeout(result));
209     }
210
211     /**
212      * Test operation failure. Invokes given operation, which initiates message to the backend.
213      * Received message is checked by expectFunction. Then replyFunction is invoked. One of the transactions in
214      * cohort receives failure response.
215      *
216      * @param operation      operation
217      * @param expectFunction expected message check
218      * @param replyFunction  response function
219      * @param <T>            type
220      * @throws Exception unexpected exception
221      */
222     private <T> void testOpFail(final Function<ClientTransactionCommitCohort, ListenableFuture<T>> operation,
223             final Consumer<TransactionTester<RemoteProxyTransaction>> expectFunction,
224             final Consumer<TransactionTester<RemoteProxyTransaction>> replyFunction) throws Exception {
225         final ListenableFuture<T> canCommit = operation.apply(cohort);
226         //reply success to all except last transaction
227         replySuccess(transactions.subList(0, transactions.size() - 1), expectFunction, replyFunction);
228         //reply fail to last transaction
229         final TransactionTester<RemoteProxyTransaction> last = transactions.get(transactions.size() - 1);
230         expectFunction.accept(last);
231         final RuntimeRequestException cause = new RuntimeRequestException("fail", new RuntimeException());
232         last.replyFailure(cause);
233         //check future fail
234         final ExecutionException exception =
235                 assertOperationThrowsException(() -> getWithTimeout(canCommit), ExecutionException.class);
236         Assert.assertEquals(cause, exception.getCause());
237     }
238
239 }