2 * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import static org.junit.Assert.assertEquals;
11 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.CLIENT_ID;
12 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.HISTORY_ID;
13 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.TRANSACTION_ID;
14 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertOperationThrowsException;
15 import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.getWithTimeout;
17 import akka.actor.ActorSystem;
18 import akka.testkit.TestProbe;
19 import akka.testkit.javadsl.TestKit;
20 import com.google.common.primitives.UnsignedLong;
21 import com.google.common.util.concurrent.ListenableFuture;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.List;
25 import java.util.Optional;
26 import java.util.concurrent.ExecutionException;
27 import java.util.function.Consumer;
28 import java.util.function.Function;
29 import java.util.stream.Collectors;
30 import org.junit.After;
31 import org.junit.Before;
32 import org.junit.Test;
33 import org.junit.runner.RunWith;
34 import org.mockito.Mock;
35 import org.mockito.junit.MockitoJUnitRunner;
36 import org.opendaylight.controller.cluster.access.ABIVersion;
37 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
38 import org.opendaylight.controller.cluster.access.client.AccessClientUtil;
39 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
40 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
41 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
42 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
43 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
44 import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
45 import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
46 import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
47 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
48 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
49 import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
50 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
51 import org.opendaylight.mdsal.common.api.CommitInfo;
52 import org.opendaylight.yangtools.yang.common.Empty;
54 @RunWith(MockitoJUnitRunner.StrictStubs.class)
55 public class ClientTransactionCommitCohortTest {
57 private static final String PERSISTENCE_ID = "per-1";
58 private static final int TRANSACTIONS = 3;
61 private AbstractClientHistory history;
62 private ActorSystem system;
63 private List<TransactionTester<RemoteProxyTransaction>> transactions;
64 private ClientTransactionCommitCohort cohort;
68 system = ActorSystem.apply();
69 final TestProbe clientContextProbe = new TestProbe(system, "clientContext");
70 final ClientActorContext context =
71 AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
72 transactions = new ArrayList<>();
73 for (int i = 0; i < TRANSACTIONS; i++) {
74 transactions.add(createTransactionTester(new TestProbe(system, "backend" + i), context, history));
76 final Collection<AbstractProxyTransaction> proxies = transactions.stream()
77 .map(TransactionTester::getTransaction)
78 .collect(Collectors.toList());
79 proxies.forEach(AbstractProxyTransaction::seal);
80 cohort = new ClientTransactionCommitCohort(history, TRANSACTION_ID, proxies);
84 public void tearDown() {
85 TestKit.shutdownActorSystem(system);
89 public void testCanCommit() throws Exception {
90 testOpSuccess(ClientTransactionCommitCohort::canCommit, this::expectCanCommit,
91 this::replyCanCommitSuccess, Boolean.TRUE);
95 public void testCanCommitFail() throws Exception {
96 testOpFail(ClientTransactionCommitCohort::canCommit, this::expectCanCommit, this::replyCanCommitSuccess);
100 public void testPreCommit() throws Exception {
101 testOpSuccess(ClientTransactionCommitCohort::preCommit, this::expectPreCommit, this::replyPreCommitSuccess,
106 public void testPreCommitFail() throws Exception {
107 testOpFail(ClientTransactionCommitCohort::preCommit, this::expectPreCommit, this::replyPreCommitSuccess);
111 public void testCommit() throws Exception {
112 testOpSuccess(ClientTransactionCommitCohort::commit, this::expectCommit, this::replyCommitSuccess,
117 public void testCommitFail() throws Exception {
118 testOpFail(ClientTransactionCommitCohort::commit, this::expectCommit, this::replyCommitSuccess);
122 public void testAbort() throws Exception {
123 testOpSuccess(ClientTransactionCommitCohort::abort, this::expectAbort, this::replyAbortSuccess, Empty.value());
127 public void testAbortFail() throws Exception {
128 testOpFail(ClientTransactionCommitCohort::abort, this::expectAbort, this::replyAbortSuccess);
131 private void expectCanCommit(final TransactionTester<RemoteProxyTransaction> tester) {
132 final ModifyTransactionRequest request = tester.expectTransactionRequest(ModifyTransactionRequest.class);
133 assertEquals(Optional.of(PersistenceProtocol.THREE_PHASE), request.getPersistenceProtocol());
136 void expectPreCommit(final TransactionTester<?> tester) {
137 tester.expectTransactionRequest(TransactionPreCommitRequest.class);
140 void expectCommit(final TransactionTester<?> tester) {
141 tester.expectTransactionRequest(TransactionDoCommitRequest.class);
144 void expectAbort(final TransactionTester<?> tester) {
145 tester.expectTransactionRequest(TransactionAbortRequest.class);
148 void replyCanCommitSuccess(final TransactionTester<?> tester) {
149 final RequestSuccess<?, ?> success = new TransactionCanCommitSuccess(tester.getTransaction().getIdentifier(),
150 tester.getLastReceivedMessage().getSequence());
151 tester.replySuccess(success);
154 void replyPreCommitSuccess(final TransactionTester<?> tester) {
155 final RequestSuccess<?, ?> success = new TransactionPreCommitSuccess(tester.getTransaction().getIdentifier(),
156 tester.getLastReceivedMessage().getSequence());
157 tester.replySuccess(success);
160 void replyCommitSuccess(final TransactionTester<?> tester) {
161 final RequestSuccess<?, ?> success = new TransactionCommitSuccess(tester.getTransaction().getIdentifier(),
162 tester.getLastReceivedMessage().getSequence());
163 tester.replySuccess(success);
166 void replyAbortSuccess(final TransactionTester<?> tester) {
167 final RequestSuccess<?, ?> success = new TransactionAbortSuccess(tester.getTransaction().getIdentifier(),
168 tester.getLastReceivedMessage().getSequence());
169 tester.replySuccess(success);
172 private static TransactionTester<RemoteProxyTransaction> createTransactionTester(final TestProbe backendProbe,
173 final ClientActorContext context,
174 final AbstractClientHistory history) {
175 final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
176 "default", UnsignedLong.ZERO, Optional.empty(), 3);
177 final AbstractClientConnection<ShardBackendInfo> connection =
178 AccessClientUtil.createConnectedConnection(context, 0L, backend);
179 final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID);
180 final RemoteProxyTransaction transaction =
181 new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false, false);
182 return new TransactionTester<>(transaction, connection, backendProbe);
185 private static <T extends TransactionTester<?>> void replySuccess(final Collection<T> transactions,
186 final Consumer<T> expect, final Consumer<T> reply) {
187 for (final T transaction : transactions) {
188 expect.accept(transaction);
189 reply.accept(transaction);
194 * Test operation success. Invokes given operation, which initiates message to the backend.
195 * Received message is checked by expectFunction. Then replyFunction is invoked. Expected result is compared
196 * to the operation future result.
198 * @param operation operation
199 * @param expectFunction expected message check
200 * @param replyFunction response function
201 * @param expectedResult expected operation result
203 * @throws Exception unexpected exception
205 private <T> void testOpSuccess(final Function<ClientTransactionCommitCohort, ListenableFuture<T>> operation,
206 final Consumer<TransactionTester<RemoteProxyTransaction>> expectFunction,
207 final Consumer<TransactionTester<RemoteProxyTransaction>> replyFunction,
208 final T expectedResult) throws Exception {
209 final ListenableFuture<T> result = operation.apply(cohort);
210 replySuccess(transactions, expectFunction, replyFunction);
211 assertEquals(expectedResult, getWithTimeout(result));
215 * Test operation failure. Invokes given operation, which initiates message to the backend.
216 * Received message is checked by expectFunction. Then replyFunction is invoked. One of the transactions in
217 * cohort receives failure response.
219 * @param operation operation
220 * @param expectFunction expected message check
221 * @param replyFunction response function
223 * @throws Exception unexpected exception
225 private <T> void testOpFail(final Function<ClientTransactionCommitCohort, ListenableFuture<T>> operation,
226 final Consumer<TransactionTester<RemoteProxyTransaction>> expectFunction,
227 final Consumer<TransactionTester<RemoteProxyTransaction>> replyFunction) throws Exception {
228 final ListenableFuture<T> canCommit = operation.apply(cohort);
229 //reply success to all except last transaction
230 replySuccess(transactions.subList(0, transactions.size() - 1), expectFunction, replyFunction);
231 //reply fail to last transaction
232 final TransactionTester<RemoteProxyTransaction> last = transactions.get(transactions.size() - 1);
233 expectFunction.accept(last);
234 final RuntimeException e = new RuntimeException();
235 final RuntimeRequestException cause = new RuntimeRequestException("fail", e);
236 last.replyFailure(cause);
238 final ExecutionException exception =
239 assertOperationThrowsException(() -> getWithTimeout(canCommit), ExecutionException.class);
240 assertEquals(e, exception.getCause());