/* * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.cluster.databroker.actors.dds; import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.CLIENT_ID; import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.HISTORY_ID; import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.TRANSACTION_ID; import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertOperationThrowsException; import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.getWithTimeout; import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; import akka.testkit.TestProbe; import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.ListenableFuture; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.access.ABIVersion; import org.opendaylight.controller.cluster.access.client.AbstractClientConnection; import org.opendaylight.controller.cluster.access.client.AccessClientUtil; import org.opendaylight.controller.cluster.access.client.ClientActorContext; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol; import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest; import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest; import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess; import org.opendaylight.controller.cluster.access.concepts.RequestSuccess; import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException; public class ClientTransactionCommitCohortTest { private static final String PERSISTENCE_ID = "per-1"; private static final int TRANSACTIONS = 3; @Mock private AbstractClientHistory history; private ActorSystem system; private List transactions; private ClientTransactionCommitCohort cohort; @Before public void setUp() throws Exception { MockitoAnnotations.initMocks(this); system = ActorSystem.apply(); final TestProbe clientContextProbe = new TestProbe(system, "clientContext"); final ClientActorContext context = AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID); transactions = new ArrayList<>(); for (int i = 0; i < TRANSACTIONS; i++) { transactions.add(createTransactionTester(new TestProbe(system, "backend" + i), context, history)); } final Collection proxies = transactions.stream() .map(TransactionTester::getTransaction) .collect(Collectors.toList()); proxies.forEach(AbstractProxyTransaction::seal); cohort = new ClientTransactionCommitCohort(history, TRANSACTION_ID, proxies); } @After public void tearDown() throws Exception { JavaTestKit.shutdownActorSystem(system); } @Test public void testCanCommit() throws Exception { testOpSuccess(ClientTransactionCommitCohort::canCommit, this::expectCanCommit, this::replyCanCommitSuccess, true); } @Test public void testCanCommitFail() throws Exception { testOpFail(ClientTransactionCommitCohort::canCommit, this::expectCanCommit, this::replyCanCommitSuccess); } @Test public void testPreCommit() throws Exception { testOpSuccess(ClientTransactionCommitCohort::preCommit, this::expectPreCommit, this::replyPreCommitSuccess, null); } @Test public void testPreCommitFail() throws Exception { testOpFail(ClientTransactionCommitCohort::preCommit, this::expectPreCommit, this::replyPreCommitSuccess); } @Test public void testCommit() throws Exception { testOpSuccess(ClientTransactionCommitCohort::commit, this::expectCommit, this::replyCommitSuccess, null); } @Test public void testCommitFail() throws Exception { testOpFail(ClientTransactionCommitCohort::commit, this::expectCommit, this::replyCommitSuccess); } @Test public void testAbort() throws Exception { testOpSuccess(ClientTransactionCommitCohort::abort, this::expectAbort, this::replyAbortSuccess, null); } @Test public void testAbortFail() throws Exception { testOpFail(ClientTransactionCommitCohort::abort, this::expectAbort, this::replyAbortSuccess); } private void expectCanCommit(final TransactionTester tester) { final ModifyTransactionRequest request = tester.expectTransactionRequest(ModifyTransactionRequest.class); Assert.assertTrue(request.getPersistenceProtocol().isPresent()); Assert.assertEquals(PersistenceProtocol.THREE_PHASE, request.getPersistenceProtocol().get()); } void expectPreCommit(final TransactionTester tester) { tester.expectTransactionRequest(TransactionPreCommitRequest.class); } void expectCommit(final TransactionTester tester) { tester.expectTransactionRequest(TransactionDoCommitRequest.class); } void expectAbort(final TransactionTester tester) { tester.expectTransactionRequest(TransactionAbortRequest.class); } void replyCanCommitSuccess(final TransactionTester tester) { final RequestSuccess success = new TransactionCanCommitSuccess(tester.getTransaction().getIdentifier(), tester.getLastReceivedMessage().getSequence()); tester.replySuccess(success); } void replyPreCommitSuccess(final TransactionTester tester) { final RequestSuccess success = new TransactionPreCommitSuccess(tester.getTransaction().getIdentifier(), tester.getLastReceivedMessage().getSequence()); tester.replySuccess(success); } void replyCommitSuccess(final TransactionTester tester) { final RequestSuccess success = new TransactionCommitSuccess(tester.getTransaction().getIdentifier(), tester.getLastReceivedMessage().getSequence()); tester.replySuccess(success); } void replyAbortSuccess(final TransactionTester tester) { final RequestSuccess success = new TransactionAbortSuccess(tester.getTransaction().getIdentifier(), tester.getLastReceivedMessage().getSequence()); tester.replySuccess(success); } private static TransactionTester createTransactionTester(final TestProbe backendProbe, final ClientActorContext context, final AbstractClientHistory history) { final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON, "default", UnsignedLong.ZERO, Optional.empty(), 3); final AbstractClientConnection connection = AccessClientUtil.createConnectedConnection(context, 0L, backend); final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID); final RemoteProxyTransaction transaction = new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false); return new TransactionTester(transaction, connection, backendProbe); } private void replySuccess(final Collection transactions, final Consumer expect, final Consumer reply) { for (final TransactionTester transaction : transactions) { expect.accept(transaction); reply.accept(transaction); } } /** * Test operation success. Invokes given operation, which initiates message to the backend. * Received message is checked by expectFunction. Then replyFunction is invoked. Expected result is compared * to the operation future result. * * @param operation operation * @param expectFunction expected message check * @param replyFunction response function * @param expectedResult expected operation result * @param type * @throws Exception unexpected exception */ private void testOpSuccess(final Function> operation, final Consumer expectFunction, final Consumer replyFunction, final T expectedResult) throws Exception { final ListenableFuture result = operation.apply(cohort); replySuccess(transactions, expectFunction, replyFunction); Assert.assertEquals(expectedResult, getWithTimeout(result)); } /** * Test operation failure. Invokes given operation, which initiates message to the backend. * Received message is checked by expectFunction. Then replyFunction is invoked. One of the transactions in * cohort receives failure response. * * @param operation operation * @param expectFunction expected message check * @param replyFunction response function * @param type * @throws Exception unexpected exception */ private void testOpFail(final Function> operation, final Consumer expectFunction, final Consumer replyFunction) throws Exception { final ListenableFuture canCommit = operation.apply(cohort); //reply success to all except last transaction replySuccess(transactions.subList(0, transactions.size() - 1), expectFunction, replyFunction); //reply fail to last transaction final TransactionTester last = transactions.get(transactions.size() - 1); expectFunction.accept(last); final RuntimeRequestException cause = new RuntimeRequestException("fail", new RuntimeException()); last.replyFailure(cause); //check future fail final ExecutionException exception = assertOperationThrowsException(() -> getWithTimeout(canCommit), ExecutionException.class); Assert.assertEquals(cause, exception.getCause()); } }