/* * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.cluster.databroker.actors.dds; import static org.hamcrest.CoreMatchers.both; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.core.Is.isA; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; import akka.testkit.TestProbe; import com.google.common.primitives.UnsignedLong; import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.function.BiFunction; import java.util.function.Consumer; import org.hamcrest.BaseMatcher; import org.hamcrest.Description; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.access.ABIVersion; import org.opendaylight.controller.cluster.access.client.AbstractClientConnection; import org.opendaylight.controller.cluster.access.client.AccessClientUtil; import org.opendaylight.controller.cluster.access.client.ClientActorContext; import org.opendaylight.controller.cluster.access.client.ConnectionEntry; import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionDelete; import org.opendaylight.controller.cluster.access.commands.TransactionMerge; import org.opendaylight.controller.cluster.access.commands.TransactionModification; import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionWrite; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; import org.opendaylight.yangtools.yang.data.impl.schema.Builders; public abstract class AbstractProxyTransactionTest { protected static final TransactionIdentifier TRANSACTION_ID = TestUtils.TRANSACTION_ID; private static final ClientIdentifier CLIENT_ID = TestUtils.CLIENT_ID; private static final LocalHistoryIdentifier HISTORY_ID = TestUtils.HISTORY_ID; protected static final YangInstanceIdentifier PATH_1 = YangInstanceIdentifier.builder() .node(QName.create("ns-1", "node-1")) .build(); protected static final YangInstanceIdentifier PATH_2 = YangInstanceIdentifier.builder() .node(QName.create("ns-1", "node-2")) .build(); protected static final YangInstanceIdentifier PATH_3 = YangInstanceIdentifier.builder() .node(QName.create("ns-1", "node-3")) .build(); protected static final ContainerNode DATA_1 = Builders.containerBuilder() .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(PATH_1.getLastPathArgument().getNodeType())) .build(); protected static final ContainerNode DATA_2 = Builders.containerBuilder() .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(PATH_2.getLastPathArgument().getNodeType())) .build(); protected static final String PERSISTENCE_ID = "per-1"; @Mock private DataTreeSnapshot snapshot; @Mock private AbstractClientHistory history; private ActorSystem system; private TestProbe backendProbe; private TestProbe clientContextProbe; private TransactionTester tester; protected T transaction; @Before public void setUp() throws Exception { MockitoAnnotations.initMocks(this); system = ActorSystem.apply(); clientContextProbe = new TestProbe(system, "clientContext"); backendProbe = new TestProbe(system, "backend"); final ClientActorContext context = AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID); final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON, "default", UnsignedLong.ZERO, Optional.empty(), 3); final AbstractClientConnection connection = AccessClientUtil.createConnectedConnection(context, 0L, backend); final ProxyHistory parent = ProxyHistory.createClient(history, connection, HISTORY_ID); transaction = createTransaction(parent, TestUtils.TRANSACTION_ID, snapshot); tester = new TransactionTester<>(transaction, connection, backendProbe); } protected abstract T createTransaction(ProxyHistory parent, TransactionIdentifier id, DataTreeSnapshot snapshot); @After public void tearDown() throws Exception { JavaTestKit.shutdownActorSystem(system); } @Test public abstract void testExists() throws Exception; @Test public abstract void testRead() throws Exception; @Test public abstract void testWrite() throws Exception; @Test public abstract void testMerge() throws Exception; @Test public abstract void testDelete() throws Exception; @Test public abstract void testDirectCommit() throws Exception; @Test public abstract void testCanCommit() throws Exception; @Test public abstract void testPreCommit() throws Exception; @Test public abstract void testDoCommit() throws Exception; @Test public abstract void testForwardToRemoteAbort() throws Exception; @Test public abstract void testForwardToRemoteCommit() throws Exception; @Test public void testAbortVotingFuture() throws Exception { testRequestResponse(f -> transaction.abort(f), TransactionAbortRequest.class, TransactionAbortSuccess::new); } @Test public void testForwardToRemotePurge() throws Exception { final TestProbe probe = new TestProbe(system); final TransactionPurgeRequest request = new TransactionPurgeRequest(TRANSACTION_ID, 0L, probe.ref()); testForwardToRemote(request, TransactionPurgeRequest.class); } @Test public void testReplayMessages() throws Exception { final TestProbe probe = new TestProbe(system); final List entries = new ArrayList<>(); final Consumer> callback = createCallbackMock(); final ReadTransactionRequest request1 = new ReadTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_2, true); final ExistsTransactionRequest request2 = new ExistsTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_3, true); entries.add(AccessClientUtil.createConnectionEntry(request1, callback, 0L)); entries.add(AccessClientUtil.createConnectionEntry(request2, callback, 0L)); final TransactionTester successor = createRemoteProxyTransactionTester(); final AbortLocalTransactionRequest successful1 = new AbortLocalTransactionRequest(TRANSACTION_ID, probe.ref()); transaction.recordSuccessfulRequest(successful1); final ReadTransactionRequest successful2 = new ReadTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_1, true); transaction.recordSuccessfulRequest(successful2); transaction.startReconnect(); transaction.replayMessages(successor.getTransaction(), entries); Assert.assertEquals(successful1, successor.expectTransactionRequest(AbortLocalTransactionRequest.class)); Assert.assertEquals(successful2, successor.expectTransactionRequest(ReadTransactionRequest.class)); Assert.assertEquals(request1, successor.expectTransactionRequest(ReadTransactionRequest.class)); Assert.assertEquals(request2, successor.expectTransactionRequest(ExistsTransactionRequest.class)); } protected void checkModifications(final ModifyTransactionRequest modifyRequest) { final List modifications = modifyRequest.getModifications(); Assert.assertEquals(3, modifications.size()); Assert.assertThat(modifications, hasItem(both(isA(TransactionWrite.class)).and((hasPath(PATH_1))))); Assert.assertThat(modifications, hasItem(both(isA(TransactionMerge.class)).and((hasPath(PATH_2))))); Assert.assertThat(modifications, hasItem(both(isA(TransactionDelete.class)).and((hasPath(PATH_3))))); } protected void testRequestResponse(final Consumer consumer, final Class expectedRequest, final BiFunction replySupplier) throws Exception { final TransactionTester tester = getTester(); final VotingFuture future = mock(VotingFuture.class); transaction.seal(); consumer.accept(future); final TransactionRequest req = tester.expectTransactionRequest(expectedRequest); tester.replySuccess(replySupplier.apply(TRANSACTION_ID, req.getSequence())); verify(future).voteYes(); } protected T testHandleForwardedRemoteRequest(final T request) throws Exception { transaction.handleForwardedRemoteRequest(request, createCallbackMock()); final RequestEnvelope envelope = backendProbe.expectMsgClass(RequestEnvelope.class); final T received = (T) envelope.getMessage(); Assert.assertTrue(received.getClass().equals(request.getClass())); Assert.assertEquals(TRANSACTION_ID, received.getTarget()); Assert.assertEquals(clientContextProbe.ref(), received.getReplyTo()); return received; } protected T testForwardToRemote(final TransactionRequest toForward, final Class expectedMessageClass) { final Consumer> callback = createCallbackMock(); final TransactionTester transactionTester = createRemoteProxyTransactionTester(); final RemoteProxyTransaction successor = transactionTester.getTransaction(); transaction.forwardToRemote(successor, toForward, callback); return transactionTester.expectTransactionRequest(expectedMessageClass); } protected TransactionTester getTester() { return tester; } @SuppressWarnings("unchecked") protected Consumer createCallbackMock() { return (Consumer) mock(Consumer.class); } protected static BaseMatcher hasPath(final YangInstanceIdentifier path) { return new BaseMatcher() { @Override public boolean matches(final Object item) { return path.equals(((TransactionModification) item).getPath()); } @Override public void describeTo(final Description description) { description.appendValue(path); } @Override public void describeMismatch(final Object item, final Description description) { final TransactionModification modification = (TransactionModification) item; description.appendText("was ").appendValue(modification.getPath()); } }; } protected TestProbe createProbe() { return new TestProbe(system); } protected TransactionTester createLocalProxy() { final TestProbe backendProbe = new TestProbe(system, "backend2"); final TestProbe clientContextProbe = new TestProbe(system, "clientContext2"); final ClientActorContext context = AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID); final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON, "default", UnsignedLong.ZERO, Optional.empty(), 3); final AbstractClientConnection connection = AccessClientUtil.createConnectedConnection(context, 0L, backend); final AbstractClientHistory history = mock(AbstractClientHistory.class); final ProxyHistory parent = ProxyHistory.createClient(history, connection, HISTORY_ID); final DataTreeSnapshot snapshot = mock(DataTreeSnapshot.class); when(snapshot.newModification()).thenReturn(mock(CursorAwareDataTreeModification.class)); final LocalReadWriteProxyTransaction tx = new LocalReadWriteProxyTransaction(parent, TestUtils.TRANSACTION_ID, snapshot); return new TransactionTester<>(tx, connection, backendProbe); } protected TransactionTester createRemoteProxyTransactionTester() { final TestProbe clientContextProbe = new TestProbe(system, "remoteClientContext"); final TestProbe backendProbe = new TestProbe(system, "remoteBackend"); final AbstractClientHistory history = mock(AbstractClientHistory.class); final ClientActorContext context = AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID); final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON, "default", UnsignedLong.ZERO, Optional.empty(), 5); final AbstractClientConnection connection = AccessClientUtil.createConnectedConnection(context, 0L, backend); final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID); final RemoteProxyTransaction transaction = new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false); return new TransactionTester<>(transaction, connection, backendProbe); } }