import akka.actor.ActorRef;
import akka.actor.ActorSystem;
+import java.util.function.Consumer;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
/**
connection.receiveResponse(envelope);
}
+ public static ConnectionEntry createConnectionEntry(final Request<?, ?> request,
+ final Consumer<Response<?, ?>> callback,
+ final long now) {
+ return new ConnectionEntry(request, callback, now);
+ }
+
}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import static org.hamcrest.CoreMatchers.both;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.core.Is.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestProbe;
+import com.google.common.primitives.UnsignedLong;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
+import org.opendaylight.controller.cluster.access.client.AccessClientUtil;
+import org.opendaylight.controller.cluster.access.client.ClientActorContext;
+import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
+import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
+import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
+import org.opendaylight.controller.cluster.access.commands.TransactionModification;
+import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+
+public abstract class AbstractProxyTransactionTest<T extends AbstractProxyTransaction> {
+ protected static final TransactionIdentifier TRANSACTION_ID = TestUtils.TRANSACTION_ID;
+ private static final ClientIdentifier CLIENT_ID = TestUtils.CLIENT_ID;
+ private static final LocalHistoryIdentifier HISTORY_ID = TestUtils.HISTORY_ID;
+
+ protected static final YangInstanceIdentifier PATH_1 = YangInstanceIdentifier.builder()
+ .node(QName.create("ns-1", "node-1"))
+ .build();
+ protected static final YangInstanceIdentifier PATH_2 = YangInstanceIdentifier.builder()
+ .node(QName.create("ns-1", "node-2"))
+ .build();
+ protected static final YangInstanceIdentifier PATH_3 = YangInstanceIdentifier.builder()
+ .node(QName.create("ns-1", "node-3"))
+ .build();
+ protected static final ContainerNode DATA_1 = Builders.containerBuilder()
+ .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(PATH_1.getLastPathArgument().getNodeType()))
+ .build();
+ protected static final ContainerNode DATA_2 = Builders.containerBuilder()
+ .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(PATH_2.getLastPathArgument().getNodeType()))
+ .build();
+ protected static final String PERSISTENCE_ID = "per-1";
+
+ @Mock
+ private DataTreeSnapshot snapshot;
+ @Mock
+ private AbstractClientHistory history;
+ private ActorSystem system;
+ private TestProbe backendProbe;
+ private TestProbe clientContextProbe;
+ private TransactionTester<T> tester;
+ protected T transaction;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ system = ActorSystem.apply();
+ clientContextProbe = new TestProbe(system, "clientContext");
+ backendProbe = new TestProbe(system, "backend");
+ final ClientActorContext context =
+ AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
+ final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
+ "default", UnsignedLong.ZERO, Optional.empty(), 3);
+ final AbstractClientConnection<ShardBackendInfo> connection =
+ AccessClientUtil.createConnectedConnection(context, 0L, backend);
+ final ProxyHistory parent = ProxyHistory.createClient(history, connection, HISTORY_ID);
+ transaction = createTransaction(parent, TestUtils.TRANSACTION_ID, snapshot);
+ tester = new TransactionTester<>(transaction, connection, backendProbe);
+ }
+
+ protected abstract T createTransaction(ProxyHistory parent, TransactionIdentifier id, DataTreeSnapshot snapshot);
+
+ @After
+ public void tearDown() throws Exception {
+ JavaTestKit.shutdownActorSystem(system);
+ }
+
+ @Test
+ public abstract void testExists() throws Exception;
+
+ @Test
+ public abstract void testRead() throws Exception;
+
+ @Test
+ public abstract void testWrite() throws Exception;
+
+ @Test
+ public abstract void testMerge() throws Exception;
+
+ @Test
+ public abstract void testDelete() throws Exception;
+
+ @Test
+ public abstract void testDirectCommit() throws Exception;
+
+ @Test
+ public abstract void testCanCommit() throws Exception;
+
+ @Test
+ public abstract void testPreCommit() throws Exception;
+
+ @Test
+ public abstract void testDoCommit() throws Exception;
+
+ @Test
+ public abstract void testForwardToRemoteAbort() throws Exception;
+
+ @Test
+ public abstract void testForwardToRemoteCommit() throws Exception;
+
+ @Test
+ public void testAbortVotingFuture() throws Exception {
+ testRequestResponse(f -> transaction.abort(f), TransactionAbortRequest.class, TransactionAbortSuccess::new);
+ }
+
+ @Test
+ public void testForwardToRemotePurge() throws Exception {
+ final TestProbe probe = new TestProbe(system);
+ final TransactionPurgeRequest request = new TransactionPurgeRequest(TRANSACTION_ID, 0L, probe.ref());
+ testForwardToRemote(request, TransactionPurgeRequest.class);
+ }
+
+ @Test
+ public void testReplayMessages() throws Exception {
+ final TestProbe probe = new TestProbe(system);
+ final List<ConnectionEntry> entries = new ArrayList<>();
+ final Consumer<Response<?, ?>> callback = createCallbackMock();
+ final ReadTransactionRequest request1 =
+ new ReadTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_2, true);
+ final ExistsTransactionRequest request2 =
+ new ExistsTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_3, true);
+ entries.add(AccessClientUtil.createConnectionEntry(request1, callback, 0L));
+ entries.add(AccessClientUtil.createConnectionEntry(request2, callback, 0L));
+ final TransactionTester<RemoteProxyTransaction> successor = createRemoteProxyTransactionTester();
+ final AbortLocalTransactionRequest successful1 = new AbortLocalTransactionRequest(TRANSACTION_ID, probe.ref());
+ transaction.recordSuccessfulRequest(successful1);
+ final ReadTransactionRequest successful2 =
+ new ReadTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_1, true);
+ transaction.recordSuccessfulRequest(successful2);
+ transaction.startReconnect();
+ transaction.replayMessages(successor.getTransaction(), entries);
+ Assert.assertEquals(successful1, successor.expectTransactionRequest(AbortLocalTransactionRequest.class));
+ Assert.assertEquals(successful2, successor.expectTransactionRequest(ReadTransactionRequest.class));
+ Assert.assertEquals(request1, successor.expectTransactionRequest(ReadTransactionRequest.class));
+ Assert.assertEquals(request2, successor.expectTransactionRequest(ExistsTransactionRequest.class));
+ }
+
+ protected void checkModifications(final ModifyTransactionRequest modifyRequest) {
+ final List<TransactionModification> modifications = modifyRequest.getModifications();
+ Assert.assertEquals(3, modifications.size());
+ Assert.assertThat(modifications, hasItem(both(isA(TransactionWrite.class)).and((hasPath(PATH_1)))));
+ Assert.assertThat(modifications, hasItem(both(isA(TransactionMerge.class)).and((hasPath(PATH_2)))));
+ Assert.assertThat(modifications, hasItem(both(isA(TransactionDelete.class)).and((hasPath(PATH_3)))));
+ }
+
+ protected void testRequestResponse(final Consumer<VotingFuture> consumer,
+ final Class<? extends TransactionRequest> expectedRequest,
+ final BiFunction<TransactionIdentifier, Long, TransactionSuccess> replySupplier)
+ throws Exception {
+ final TransactionTester<T> tester = getTester();
+ final VotingFuture future = mock(VotingFuture.class);
+ transaction.seal();
+ consumer.accept(future);
+ final TransactionRequest req = tester.expectTransactionRequest(expectedRequest);
+ tester.replySuccess(replySupplier.apply(TRANSACTION_ID, req.getSequence()));
+ verify(future).voteYes();
+ }
+
+ protected <T extends TransactionRequest> T testHandleForwardedRemoteRequest(final T request) throws Exception {
+ transaction.handleForwardedRemoteRequest(request, createCallbackMock());
+ final RequestEnvelope envelope = backendProbe.expectMsgClass(RequestEnvelope.class);
+ final T received = (T) envelope.getMessage();
+ Assert.assertTrue(received.getClass().equals(request.getClass()));
+ Assert.assertEquals(TRANSACTION_ID, received.getTarget());
+ Assert.assertEquals(clientContextProbe.ref(), received.getReplyTo());
+ return received;
+ }
+
+ protected <T extends TransactionRequest> T testForwardToRemote(final TransactionRequest toForward,
+ final Class<T> expectedMessageClass) {
+ final Consumer<Response<?, ?>> callback = createCallbackMock();
+ final TransactionTester<RemoteProxyTransaction> transactionTester = createRemoteProxyTransactionTester();
+ final RemoteProxyTransaction successor = transactionTester.getTransaction();
+ transaction.forwardToRemote(successor, toForward, callback);
+ return transactionTester.expectTransactionRequest(expectedMessageClass);
+ }
+
+ protected TransactionTester<T> getTester() {
+ return tester;
+ }
+
+ @SuppressWarnings("unchecked")
+ protected <T> Consumer<T> createCallbackMock() {
+ return (Consumer<T>) mock(Consumer.class);
+ }
+
+ protected static BaseMatcher<TransactionModification> hasPath(final YangInstanceIdentifier path) {
+ return new BaseMatcher<TransactionModification>() {
+
+ @Override
+ public boolean matches(final Object item) {
+ return path.equals(((TransactionModification) item).getPath());
+ }
+
+ @Override
+ public void describeTo(final Description description) {
+ description.appendValue(path);
+ }
+
+ @Override
+ public void describeMismatch(final Object item, final Description description) {
+ final TransactionModification modification = (TransactionModification) item;
+ description.appendText("was ").appendValue(modification.getPath());
+ }
+ };
+ }
+
+ protected TestProbe createProbe() {
+ return new TestProbe(system);
+ }
+
+ protected TransactionTester<LocalReadWriteProxyTransaction> createLocalProxy() {
+ final TestProbe backendProbe = new TestProbe(system, "backend2");
+ final TestProbe clientContextProbe = new TestProbe(system, "clientContext2");
+ final ClientActorContext context =
+ AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
+ final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
+ "default", UnsignedLong.ZERO, Optional.empty(), 3);
+ final AbstractClientConnection<ShardBackendInfo> connection =
+ AccessClientUtil.createConnectedConnection(context, 0L, backend);
+ final AbstractClientHistory history = mock(AbstractClientHistory.class);
+ final ProxyHistory parent = ProxyHistory.createClient(history, connection, HISTORY_ID);
+ final DataTreeSnapshot snapshot = mock(DataTreeSnapshot.class);
+ when(snapshot.newModification()).thenReturn(mock(CursorAwareDataTreeModification.class));
+ final LocalReadWriteProxyTransaction tx =
+ new LocalReadWriteProxyTransaction(parent, TestUtils.TRANSACTION_ID, snapshot);
+ return new TransactionTester<>(tx, connection, backendProbe);
+ }
+
+ protected TransactionTester<RemoteProxyTransaction> createRemoteProxyTransactionTester() {
+ final TestProbe clientContextProbe = new TestProbe(system, "remoteClientContext");
+ final TestProbe backendProbe = new TestProbe(system, "remoteBackend");
+ final AbstractClientHistory history = mock(AbstractClientHistory.class);
+ final ClientActorContext context =
+ AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
+ final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
+ "default", UnsignedLong.ZERO, Optional.empty(), 5);
+ final AbstractClientConnection<ShardBackendInfo> connection =
+ AccessClientUtil.createConnectedConnection(context, 0L, backend);
+ final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID);
+ final RemoteProxyTransaction transaction =
+ new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false);
+ return new TransactionTester<>(transaction, connection, backendProbe);
+ }
+}
@Mock
private AbstractClientHistory history;
private ActorSystem system;
- private List<TranasactionTester> transactions;
+ private List<TransactionTester> transactions;
private ClientTransactionCommitCohort cohort;
@Before
transactions.add(createTransactionTester(new TestProbe(system, "backend" + i), context, history));
}
final Collection<AbstractProxyTransaction> proxies = transactions.stream()
- .map(TranasactionTester::getTransaction)
+ .map(TransactionTester::getTransaction)
.collect(Collectors.toList());
proxies.forEach(AbstractProxyTransaction::seal);
cohort = new ClientTransactionCommitCohort(history, TRANSACTION_ID, proxies);
testOpFail(ClientTransactionCommitCohort::abort, this::expectAbort, this::replyAbortSuccess);
}
- private void expectCanCommit(final TranasactionTester tester) {
+ private void expectCanCommit(final TransactionTester<RemoteProxyTransaction> tester) {
final ModifyTransactionRequest request = tester.expectTransactionRequest(ModifyTransactionRequest.class);
Assert.assertTrue(request.getPersistenceProtocol().isPresent());
Assert.assertEquals(PersistenceProtocol.THREE_PHASE, request.getPersistenceProtocol().get());
}
- void expectPreCommit(final TranasactionTester tester) {
+ void expectPreCommit(final TransactionTester tester) {
tester.expectTransactionRequest(TransactionPreCommitRequest.class);
}
- void expectCommit(final TranasactionTester tester) {
+ void expectCommit(final TransactionTester tester) {
tester.expectTransactionRequest(TransactionDoCommitRequest.class);
}
- void expectAbort(final TranasactionTester tester) {
+ void expectAbort(final TransactionTester tester) {
tester.expectTransactionRequest(TransactionAbortRequest.class);
}
- void replyCanCommitSuccess(final TranasactionTester tester) {
+ void replyCanCommitSuccess(final TransactionTester tester) {
final RequestSuccess<?, ?> success = new TransactionCanCommitSuccess(tester.getTransaction().getIdentifier(),
tester.getLastReceivedMessage().getSequence());
tester.replySuccess(success);
}
- void replyPreCommitSuccess(final TranasactionTester tester) {
+ void replyPreCommitSuccess(final TransactionTester tester) {
final RequestSuccess<?, ?> success = new TransactionPreCommitSuccess(tester.getTransaction().getIdentifier(),
tester.getLastReceivedMessage().getSequence());
tester.replySuccess(success);
}
- void replyCommitSuccess(final TranasactionTester tester) {
+ void replyCommitSuccess(final TransactionTester tester) {
final RequestSuccess<?, ?> success = new TransactionCommitSuccess(tester.getTransaction().getIdentifier(),
tester.getLastReceivedMessage().getSequence());
tester.replySuccess(success);
}
- void replyAbortSuccess(final TranasactionTester tester) {
+ void replyAbortSuccess(final TransactionTester tester) {
final RequestSuccess<?, ?> success = new TransactionAbortSuccess(tester.getTransaction().getIdentifier(),
tester.getLastReceivedMessage().getSequence());
tester.replySuccess(success);
}
- private static TranasactionTester createTransactionTester(final TestProbe backendProbe,
- final ClientActorContext context,
- final AbstractClientHistory history) {
+ private static TransactionTester createTransactionTester(final TestProbe backendProbe,
+ final ClientActorContext context,
+ final AbstractClientHistory history) {
final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
"default", UnsignedLong.ZERO, Optional.empty(), 3);
final AbstractClientConnection<ShardBackendInfo> connection =
final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID);
final RemoteProxyTransaction transaction =
new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false);
- return new TranasactionTester(transaction, connection, backendProbe);
+ return new TransactionTester(transaction, connection, backendProbe);
}
- private void replySuccess(final Collection<TranasactionTester> transactions,
- final Consumer<TranasactionTester> expect,
- final Consumer<TranasactionTester> reply) {
- for (final TranasactionTester transaction : transactions) {
+ private void replySuccess(final Collection<TransactionTester> transactions,
+ final Consumer<TransactionTester> expect,
+ final Consumer<TransactionTester> reply) {
+ for (final TransactionTester transaction : transactions) {
expect.accept(transaction);
reply.accept(transaction);
}
* @throws Exception unexpected exception
*/
private <T> void testOpSuccess(final Function<ClientTransactionCommitCohort, ListenableFuture<T>> operation,
- final Consumer<TranasactionTester> expectFunction,
- final Consumer<TranasactionTester> replyFunction,
+ final Consumer<TransactionTester> expectFunction,
+ final Consumer<TransactionTester> replyFunction,
final T expectedResult) throws Exception {
final ListenableFuture<T> result = operation.apply(cohort);
replySuccess(transactions, expectFunction, replyFunction);
* @throws Exception unexpected exception
*/
private <T> void testOpFail(final Function<ClientTransactionCommitCohort, ListenableFuture<T>> operation,
- final Consumer<TranasactionTester> expectFunction,
- final Consumer<TranasactionTester> replyFunction) throws Exception {
+ final Consumer<TransactionTester> expectFunction,
+ final Consumer<TransactionTester> replyFunction) throws Exception {
final ListenableFuture<T> canCommit = operation.apply(cohort);
//reply success to all except last transaction
replySuccess(transactions.subList(0, transactions.size() - 1), expectFunction, replyFunction);
//reply fail to last transaction
- final TranasactionTester last = transactions.get(transactions.size() - 1);
+ final TransactionTester last = transactions.get(transactions.size() - 1);
expectFunction.accept(last);
final RuntimeRequestException cause = new RuntimeRequestException("fail", new RuntimeException());
last.replyFailure(cause);
@Mock
private AbstractClientHistory history;
private ActorSystem system;
- private TranasactionTester transaction;
+ private TransactionTester<?> transaction;
private DirectTransactionCommitCohort cohort;
@Before
Assert.assertNull(getWithTimeout(commit));
}
- private static TranasactionTester createTransactionTester(final TestProbe backendProbe,
- final ClientActorContext context,
- final AbstractClientHistory history) {
+ private static TransactionTester<?> createTransactionTester(final TestProbe backendProbe,
+ final ClientActorContext context,
+ final AbstractClientHistory history) {
final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
"default", UnsignedLong.ZERO, Optional.empty(), 3);
final AbstractClientConnection<ShardBackendInfo> connection =
final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID);
final RemoteProxyTransaction transaction =
new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false);
- return new TranasactionTester(transaction, connection, backendProbe);
+ return new TransactionTester<>(transaction, connection, backendProbe);
}
}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertFutureEquals;
+
+import akka.testkit.TestProbe;
+import java.util.function.Consumer;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor;
+
+public abstract class LocalProxyTransactionTest<T extends LocalProxyTransaction>
+ extends AbstractProxyTransactionTest<T> {
+
+ @Override
+ @Test
+ public void testExists() throws Exception {
+ assertFutureEquals(true, transaction.exists(PATH_1));
+ assertFutureEquals(false, transaction.exists(PATH_3));
+ }
+
+ @Override
+ @Test
+ public void testRead() throws Exception {
+ assertFutureEquals(com.google.common.base.Optional.of(DATA_1), transaction.read(PATH_1));
+ assertFutureEquals(com.google.common.base.Optional.absent(), transaction.read(PATH_3));
+ }
+
+ @Test
+ public void testDoAbort() throws Exception {
+ transaction.doAbort();
+ getTester().expectTransactionRequest(AbortLocalTransactionRequest.class);
+ }
+
+ @Test
+ public void testHandleForwardedRemoteReadRequest() throws Exception {
+ final TestProbe probe = createProbe();
+ final ReadTransactionRequest request =
+ new ReadTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_1, true);
+ final Consumer<Response<?, ?>> callback = createCallbackMock();
+ transaction.handleForwardedRemoteRequest(request, callback);
+ final ArgumentCaptor<Response> captor = ArgumentCaptor.forClass(Response.class);
+ verify(callback).accept(captor.capture());
+ final Response value = captor.getValue();
+ Assert.assertTrue(value instanceof ReadTransactionSuccess);
+ final ReadTransactionSuccess success = (ReadTransactionSuccess) value;
+ Assert.assertTrue(success.getData().isPresent());
+ Assert.assertEquals(DATA_1, success.getData().get());
+ }
+
+ @Test
+ public void testHandleForwardedRemoteExistsRequest() throws Exception {
+ final TestProbe probe = createProbe();
+ final ExistsTransactionRequest request =
+ new ExistsTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_1, true);
+ final Consumer<Response<?, ?>> callback = createCallbackMock();
+ transaction.handleForwardedRemoteRequest(request, callback);
+ final ArgumentCaptor<Response> captor = ArgumentCaptor.forClass(Response.class);
+ verify(callback).accept(captor.capture());
+ final Response value = captor.getValue();
+ Assert.assertTrue(value instanceof ExistsTransactionSuccess);
+ final ExistsTransactionSuccess success = (ExistsTransactionSuccess) value;
+ Assert.assertTrue(success.getExists());
+ }
+
+ @Test
+ public void testHandleForwardedRemotePurgeRequest() throws Exception {
+ final TestProbe probe = createProbe();
+ final TransactionPurgeRequest request =
+ new TransactionPurgeRequest(TRANSACTION_ID, 0L, probe.ref());
+ testHandleForwardedRemoteRequest(request);
+ }
+
+ @Override
+ @Test
+ public void testForwardToRemoteAbort() throws Exception {
+ final TestProbe probe = createProbe();
+ final AbortLocalTransactionRequest request = new AbortLocalTransactionRequest(TRANSACTION_ID, probe.ref());
+ final ModifyTransactionRequest modifyRequest = testForwardToRemote(request, ModifyTransactionRequest.class);
+ Assert.assertTrue(modifyRequest.getPersistenceProtocol().isPresent());
+ Assert.assertEquals(PersistenceProtocol.ABORT, modifyRequest.getPersistenceProtocol().get());
+ }
+
+ @Override
+ @Test
+ public void testForwardToRemoteCommit() throws Exception {
+ final TestProbe probe = createProbe();
+ final CursorAwareDataTreeModification modification = mock(CursorAwareDataTreeModification.class);
+ final CommitLocalTransactionRequest request =
+ new CommitLocalTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), modification, true);
+ doAnswer(this::applyToCursorAnswer).when(modification).applyToCursor(any());
+ final ModifyTransactionRequest modifyRequest = testForwardToRemote(request, ModifyTransactionRequest.class);
+ verify(modification).applyToCursor(any());
+ Assert.assertTrue(modifyRequest.getPersistenceProtocol().isPresent());
+ Assert.assertEquals(PersistenceProtocol.THREE_PHASE, modifyRequest.getPersistenceProtocol().get());
+ checkModifications(modifyRequest);
+ }
+
+ @Test
+ public void testForwardToLocalAbort() throws Exception {
+ final TestProbe probe = createProbe();
+ final AbortLocalTransactionRequest request = new AbortLocalTransactionRequest(TRANSACTION_ID, probe.ref());
+ testForwardToLocal(request, AbortLocalTransactionRequest.class);
+ }
+
+ @Test
+ public void testForwardToLocalPurge() throws Exception {
+ final TestProbe probe = createProbe();
+ final TransactionPurgeRequest request = new TransactionPurgeRequest(TRANSACTION_ID, 0L, probe.ref());
+ testForwardToLocal(request, TransactionPurgeRequest.class);
+ }
+
+ protected <T extends TransactionRequest> T testForwardToLocal(final TransactionRequest toForward,
+ final Class<T> expectedMessageClass) {
+ final Consumer<Response<?, ?>> callback = createCallbackMock();
+ final TransactionTester<LocalReadWriteProxyTransaction> transactionTester = createLocalProxy();
+ final LocalReadWriteProxyTransaction successor = transactionTester.getTransaction();
+ transaction.forwardToLocal(successor, toForward, callback);
+ return transactionTester.expectTransactionRequest(expectedMessageClass);
+ }
+
+ /**
+ * To emulate side effect of void method.
+ * {@link CursorAwareDataTreeModification#applyToCursor(DataTreeModificationCursor)}
+ *
+ * @param invocation invocation
+ * @return void - always null
+ */
+ protected Answer applyToCursorAnswer(final InvocationOnMock invocation) {
+ final DataTreeModificationCursor cursor =
+ invocation.getArgumentAt(0, DataTreeModificationCursor.class);
+ cursor.write(PATH_1.getLastPathArgument(), DATA_1);
+ cursor.merge(PATH_2.getLastPathArgument(), DATA_2);
+ cursor.delete(PATH_3.getLastPathArgument());
+ return null;
+ }
+
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import static org.mockito.Mockito.when;
+import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertOperationThrowsException;
+
+import akka.testkit.TestProbe;
+import com.google.common.base.VerifyException;
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+
+public class LocalReadOnlyProxyTransactionTest extends LocalProxyTransactionTest<LocalReadOnlyProxyTransaction> {
+
+ private DataTreeSnapshot snapshot;
+
+ @Override
+ protected LocalReadOnlyProxyTransaction createTransaction(final ProxyHistory parent,
+ final TransactionIdentifier id,
+ final DataTreeSnapshot snapshot) {
+ when(snapshot.readNode(PATH_1)).thenReturn(com.google.common.base.Optional.of(DATA_1));
+ when(snapshot.readNode(PATH_3)).thenReturn(com.google.common.base.Optional.absent());
+ this.snapshot = snapshot;
+ return new LocalReadOnlyProxyTransaction(parent, id, this.snapshot);
+ }
+
+ @Test
+ public void testIsSnapshotOnly() {
+ Assert.assertTrue(transaction.isSnapshotOnly());
+ }
+
+ @Test
+ public void testReadOnlyView() {
+ Assert.assertEquals(snapshot, transaction.readOnlyView());
+ }
+
+ @Override
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDirectCommit() throws Exception {
+ transaction.directCommit();
+ }
+
+ @Override
+ @Test(expected = UnsupportedOperationException.class)
+ public void testCanCommit() throws Exception {
+ transaction.canCommit(new VotingFuture<>(new Object(), 1));
+ }
+
+ @Override
+ @Test(expected = UnsupportedOperationException.class)
+ public void testPreCommit() throws Exception {
+ transaction.preCommit(new VotingFuture<>(new Object(), 1));
+ }
+
+ @Override
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDoCommit() throws Exception {
+ transaction.doCommit(new VotingFuture<>(new Object(), 1));
+ }
+
+ @Override
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDelete() {
+ transaction.delete(PATH_1);
+ }
+
+ @Override
+ @Test(expected = UnsupportedOperationException.class)
+ public void testMerge() {
+ transaction.merge(PATH_1, DATA_1);
+ }
+
+ @Override
+ @Test(expected = UnsupportedOperationException.class)
+ public void testWrite() {
+ transaction.write(PATH_1, DATA_1);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDoDelete() {
+ transaction.doDelete(PATH_1);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDoMerge() {
+ transaction.doMerge(PATH_1, DATA_1);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testDoWrite() {
+ transaction.doWrite(PATH_1, DATA_1);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testCommitRequest() {
+ transaction.commitRequest(true);
+ }
+
+ @Test
+ public void testApplyModifyTransactionRequest() throws Exception {
+ final TestProbe probe = createProbe();
+ final ModifyTransactionRequestBuilder builder =
+ new ModifyTransactionRequestBuilder(TRANSACTION_ID, probe.ref());
+ builder.setSequence(0);
+ builder.setAbort();
+ final ModifyTransactionRequest request = builder.build();
+ transaction.applyModifyTransactionRequest(request, createCallbackMock());
+ getTester().expectTransactionRequest(AbortLocalTransactionRequest.class);
+ }
+
+ @Test
+ public void testApplyModifyTransactionRequestNotAbort() throws Exception {
+ final TestProbe probe = createProbe();
+ final ModifyTransactionRequestBuilder builder =
+ new ModifyTransactionRequestBuilder(TRANSACTION_ID, probe.ref());
+ builder.setSequence(0);
+ builder.setReady();
+ final ModifyTransactionRequest request = builder.build();
+ assertOperationThrowsException(() -> transaction.applyModifyTransactionRequest(request, createCallbackMock()),
+ VerifyException.class);
+ }
+
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertFutureEquals;
+import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertOperationThrowsException;
+
+import akka.testkit.TestProbe;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.function.Consumer;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
+import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
+import org.opendaylight.controller.cluster.access.commands.TransactionModification;
+import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+
+public class LocalReadWriteProxyTransactionTest extends LocalProxyTransactionTest<LocalReadWriteProxyTransaction> {
+ @Mock
+ private CursorAwareDataTreeModification modification;
+
+ @Override
+ protected LocalReadWriteProxyTransaction createTransaction(final ProxyHistory parent,
+ final TransactionIdentifier id,
+ final DataTreeSnapshot snapshot) {
+ when(snapshot.newModification()).thenReturn(modification);
+ when(modification.readNode(PATH_1)).thenReturn(com.google.common.base.Optional.of(DATA_1));
+ when(modification.readNode(PATH_3)).thenReturn(com.google.common.base.Optional.absent());
+ return new LocalReadWriteProxyTransaction(parent, TestUtils.TRANSACTION_ID, snapshot);
+ }
+
+ @Test
+ public void testIsSnapshotOnly() throws Exception {
+ Assert.assertFalse(transaction.isSnapshotOnly());
+ }
+
+ @Test
+ public void testReadOnlyView() throws Exception {
+ Assert.assertEquals(modification, transaction.readOnlyView());
+ }
+
+ @Test
+ @Override
+ public void testDelete() throws Exception {
+ transaction.delete(PATH_1);
+ verify(modification).delete(PATH_1);
+ }
+
+ @Test
+ @Override
+ public void testDirectCommit() throws Exception {
+ transaction.seal();
+ final ListenableFuture<Boolean> result = transaction.directCommit();
+ final TransactionTester<LocalReadWriteProxyTransaction> tester = getTester();
+ final CommitLocalTransactionRequest req = tester.expectTransactionRequest(CommitLocalTransactionRequest.class);
+ tester.replySuccess(new TransactionCommitSuccess(TRANSACTION_ID, req.getSequence()));
+ assertFutureEquals(true, result);
+ }
+
+ @Test
+ @Override
+ public void testCanCommit() throws Exception {
+ testRequestResponse(transaction::canCommit, CommitLocalTransactionRequest.class,
+ TransactionCanCommitSuccess::new);
+ }
+
+ @Test
+ @Override
+ public void testPreCommit() throws Exception {
+ testRequestResponse(transaction::preCommit, TransactionPreCommitRequest.class,
+ TransactionPreCommitSuccess::new);
+ }
+
+ @Test
+ @Override
+ public void testDoCommit() throws Exception {
+ testRequestResponse(transaction::doCommit, TransactionDoCommitRequest.class, TransactionCommitSuccess::new);
+ }
+
+ @Test
+ @Override
+ public void testMerge() throws Exception {
+ transaction.merge(PATH_1, DATA_1);
+ verify(modification).merge(PATH_1, DATA_1);
+ }
+
+ @Test
+ @Override
+ public void testWrite() throws Exception {
+ transaction.write(PATH_1, DATA_1);
+ verify(modification).write(PATH_1, DATA_1);
+ }
+
+ @Test
+ public void testCommitRequest() throws Exception {
+ transaction.doWrite(PATH_1, DATA_1);
+ final boolean coordinated = true;
+ final CommitLocalTransactionRequest request = transaction.commitRequest(coordinated);
+ Assert.assertEquals(coordinated, request.isCoordinated());
+ Assert.assertEquals(modification, request.getModification());
+ }
+
+ @Test
+ public void testModifyAfterCommitRequest() throws Exception {
+ transaction.doWrite(PATH_1, DATA_1);
+ final boolean coordinated = true;
+ transaction.commitRequest(coordinated);
+ assertOperationThrowsException(() -> transaction.doMerge(PATH_1, DATA_1), IllegalStateException.class);
+ }
+
+ @Test
+ public void testDoSeal() throws Exception {
+ assertOperationThrowsException(() -> transaction.getSnapshot(), IllegalStateException.class);
+ transaction.doSeal();
+ Assert.assertEquals(modification, transaction.getSnapshot());
+ }
+
+ @Test
+ public void testFlushState() throws Exception {
+ final TransactionTester<RemoteProxyTransaction> transactionTester = createRemoteProxyTransactionTester();
+ final RemoteProxyTransaction successor = transactionTester.getTransaction();
+ doAnswer(this::applyToCursorAnswer).when(modification).applyToCursor(any());
+ transaction.doSeal();
+ transaction.flushState(successor);
+ verify(modification).applyToCursor(any());
+ transactionTester.getTransaction().seal();
+ transactionTester.getTransaction().directCommit();
+ final ModifyTransactionRequest modifyRequest =
+ transactionTester.expectTransactionRequest(ModifyTransactionRequest.class);
+ checkModifications(modifyRequest);
+ }
+
+ @Test
+ public void testApplyModifyTransactionRequestCoordinated() throws Exception {
+ applyModifyTransactionRequest(true);
+ }
+
+ @Test
+ public void testApplyModifyTransactionRequestSimple() throws Exception {
+ applyModifyTransactionRequest(false);
+ }
+
+ @Test
+ public void testApplyModifyTransactionRequestAbort() throws Exception {
+ final TestProbe probe = createProbe();
+ final ModifyTransactionRequestBuilder builder =
+ new ModifyTransactionRequestBuilder(TRANSACTION_ID, probe.ref());
+ builder.setSequence(0L);
+ builder.setAbort();
+ final ModifyTransactionRequest request = builder.build();
+ final Consumer<Response<?, ?>> callback = createCallbackMock();
+ transaction.applyModifyTransactionRequest(request, callback);
+ getTester().expectTransactionRequest(TransactionAbortRequest.class);
+ }
+
+ @Test
+ public void testHandleForwardedRemotePreCommitRequest() throws Exception {
+ final TestProbe probe = createProbe();
+ final TransactionPreCommitRequest request =
+ new TransactionPreCommitRequest(TRANSACTION_ID, 0L, probe.ref());
+ testHandleForwardedRemoteRequest(request);
+ }
+
+ @Test
+ public void testHandleForwardedRemoteDoCommitRequest() throws Exception {
+ final TestProbe probe = createProbe();
+ final TransactionDoCommitRequest request =
+ new TransactionDoCommitRequest(TRANSACTION_ID, 0L, probe.ref());
+ testHandleForwardedRemoteRequest(request);
+ }
+
+ @Test
+ public void testHandleForwardedRemoteAbortRequest() throws Exception {
+ final TestProbe probe = createProbe();
+ final TransactionAbortRequest request =
+ new TransactionAbortRequest(TRANSACTION_ID, 0L, probe.ref());
+ testHandleForwardedRemoteRequest(request);
+ }
+
+ @Test
+ public void testForwardToLocalCommit() throws Exception {
+ final TestProbe probe = createProbe();
+ final DataTreeModification mod = mock(DataTreeModification.class);
+ final TransactionRequest<?> request =
+ new CommitLocalTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), mod, false);
+ testForwardToLocal(request, CommitLocalTransactionRequest.class);
+ }
+
+ @Test
+ public void testSendAbort() throws Exception {
+ final TestProbe probe = createProbe();
+ final TransactionRequest<?> request = new AbortLocalTransactionRequest(TRANSACTION_ID, probe.ref());
+ transaction.sendAbort(request, createCallbackMock());
+ assertOperationThrowsException(() -> transaction.delete(PATH_1), IllegalStateException.class);
+ }
+
+ private void applyModifyTransactionRequest(final boolean coordinated) {
+ final TestProbe probe = createProbe();
+ final ModifyTransactionRequestBuilder builder =
+ new ModifyTransactionRequestBuilder(TRANSACTION_ID, probe.ref());
+ final TransactionModification write = new TransactionWrite(PATH_1, DATA_1);
+ final TransactionModification merge = new TransactionMerge(PATH_2, DATA_2);
+ final TransactionModification delete = new TransactionDelete(PATH_3);
+ builder.addModification(write);
+ builder.addModification(merge);
+ builder.addModification(delete);
+ builder.setSequence(0L);
+ builder.setCommit(coordinated);
+ final ModifyTransactionRequest request = builder.build();
+ final Consumer<Response<?, ?>> callback = createCallbackMock();
+ transaction.applyModifyTransactionRequest(request, callback);
+ verify(modification).write(PATH_1, DATA_1);
+ verify(modification).merge(PATH_2, DATA_2);
+ verify(modification).delete(PATH_3);
+ final CommitLocalTransactionRequest commitRequest =
+ getTester().expectTransactionRequest(CommitLocalTransactionRequest.class);
+ Assert.assertEquals(modification, commitRequest.getModification());
+ Assert.assertEquals(coordinated, commitRequest.isCoordinated());
+ }
+
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import static org.hamcrest.CoreMatchers.both;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.isA;
+import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertFutureEquals;
+
+import akka.testkit.TestProbe;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.List;
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
+import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
+import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
+import org.opendaylight.controller.cluster.access.commands.TransactionModification;
+import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.mdsal.common.api.ReadFailedException;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+
+public class RemoteProxyTransactionTest extends AbstractProxyTransactionTest<RemoteProxyTransaction> {
+
+ @Override
+ protected RemoteProxyTransaction createTransaction(final ProxyHistory parent, final TransactionIdentifier id,
+ final DataTreeSnapshot snapshot) {
+ return new RemoteProxyTransaction(parent, TRANSACTION_ID, false, false);
+ }
+
+ @Override
+ @Test
+ public void testExists() throws Exception {
+ final TransactionTester<RemoteProxyTransaction> tester = getTester();
+ final CheckedFuture<Boolean, ReadFailedException> exists = transaction.exists(PATH_1);
+ final ExistsTransactionRequest req = tester.expectTransactionRequest(ExistsTransactionRequest.class);
+ final boolean existsResult = true;
+ tester.replySuccess(new ExistsTransactionSuccess(TRANSACTION_ID, req.getSequence(), existsResult));
+ assertFutureEquals(existsResult, exists);
+ }
+
+ @Override
+ @Test
+ public void testRead() throws Exception {
+ final TransactionTester<RemoteProxyTransaction> tester = getTester();
+ final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = transaction.read(PATH_2);
+ final ReadTransactionRequest req = tester.expectTransactionRequest(ReadTransactionRequest.class);
+ final Optional<NormalizedNode<?, ?>> result = Optional.of(DATA_1);
+ tester.replySuccess(new ReadTransactionSuccess(TRANSACTION_ID, req.getSequence(), result));
+ assertFutureEquals(result, read);
+ }
+
+ @Override
+ @Test
+ public void testWrite() throws Exception {
+ final YangInstanceIdentifier path = PATH_1;
+ testModification(() -> transaction.write(path, DATA_1), TransactionWrite.class, path);
+ }
+
+ @Override
+ @Test
+ public void testMerge() throws Exception {
+ final YangInstanceIdentifier path = PATH_2;
+ testModification(() -> transaction.merge(path, DATA_2), TransactionMerge.class, path);
+ }
+
+ @Override
+ @Test
+ public void testDelete() throws Exception {
+ final YangInstanceIdentifier path = PATH_3;
+ testModification(() -> transaction.delete(path), TransactionDelete.class, path);
+ }
+
+ @Override
+ @Test
+ public void testDirectCommit() throws Exception {
+ transaction.seal();
+ final ListenableFuture<Boolean> result = transaction.directCommit();
+ final TransactionTester<RemoteProxyTransaction> tester = getTester();
+ final ModifyTransactionRequest req = tester.expectTransactionRequest(ModifyTransactionRequest.class);
+ Assert.assertTrue(req.getPersistenceProtocol().isPresent());
+ Assert.assertEquals(PersistenceProtocol.SIMPLE, req.getPersistenceProtocol().get());
+ tester.replySuccess(new TransactionCommitSuccess(TRANSACTION_ID, req.getSequence()));
+ assertFutureEquals(true, result);
+ }
+
+ @Override
+ @Test
+ public void testCanCommit() throws Exception {
+ testRequestResponse(transaction::canCommit, ModifyTransactionRequest.class,
+ TransactionCanCommitSuccess::new);
+ }
+
+ @Override
+ @Test
+ public void testPreCommit() throws Exception {
+ testRequestResponse(transaction::preCommit, TransactionPreCommitRequest.class,
+ TransactionPreCommitSuccess::new);
+ }
+
+ @Override
+ @Test
+ public void testDoCommit() throws Exception {
+ testRequestResponse(transaction::doCommit, TransactionDoCommitRequest.class, TransactionCommitSuccess::new);
+ }
+
+ @Override
+ @Test
+ public void testForwardToRemoteAbort() throws Exception {
+ final TestProbe probe = createProbe();
+ final TransactionAbortRequest request = new TransactionAbortRequest(TRANSACTION_ID, 0L, probe.ref());
+ testForwardToRemote(request, TransactionAbortRequest.class);
+
+ }
+
+ @Override
+ public void testForwardToRemoteCommit() throws Exception {
+ final TestProbe probe = createProbe();
+ final TransactionAbortRequest request = new TransactionAbortRequest(TRANSACTION_ID, 0L, probe.ref());
+ testForwardToRemote(request, TransactionAbortRequest.class);
+ }
+
+ @Test
+ public void testForwardToRemoteModifyCommitSimple() throws Exception {
+ final TestProbe probe = createProbe();
+ final ModifyTransactionRequestBuilder builder =
+ new ModifyTransactionRequestBuilder(TRANSACTION_ID, probe.ref());
+ builder.setSequence(0L);
+ builder.setCommit(false);
+ final ModifyTransactionRequest request = builder.build();
+ final ModifyTransactionRequest received = testForwardToRemote(request, ModifyTransactionRequest.class);
+ Assert.assertEquals(request.getPersistenceProtocol(), received.getPersistenceProtocol());
+ Assert.assertEquals(request.getModifications(), received.getModifications());
+ Assert.assertEquals(request.getTarget(), received.getTarget());
+ }
+
+ @Test
+ public void testForwardToRemoteModifyCommit3Phase() throws Exception {
+ final TestProbe probe = createProbe();
+ final ModifyTransactionRequestBuilder builder =
+ new ModifyTransactionRequestBuilder(TRANSACTION_ID, probe.ref());
+ builder.setSequence(0L);
+ builder.setCommit(true);
+ final ModifyTransactionRequest request = builder.build();
+ final ModifyTransactionRequest received = testForwardToRemote(request, ModifyTransactionRequest.class);
+ Assert.assertEquals(request.getPersistenceProtocol(), received.getPersistenceProtocol());
+ Assert.assertEquals(request.getModifications(), received.getModifications());
+ Assert.assertEquals(request.getTarget(), received.getTarget());
+ }
+
+ @Test
+ public void testForwardToRemoteModifyAbort() throws Exception {
+ final TestProbe probe = createProbe();
+ final ModifyTransactionRequestBuilder builder =
+ new ModifyTransactionRequestBuilder(TRANSACTION_ID, probe.ref());
+ builder.setSequence(0L);
+ builder.setAbort();
+ final ModifyTransactionRequest request = builder.build();
+ final TransactionAbortRequest received = testForwardToRemote(request, TransactionAbortRequest.class);
+ Assert.assertEquals(request.getTarget(), received.getTarget());
+ }
+
+ @Test
+ public void testForwardToRemoteModifyRead() throws Exception {
+ final TestProbe probe = createProbe();
+ final ReadTransactionRequest request =
+ new ReadTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_1, false);
+ final ReadTransactionRequest received = testForwardToRemote(request, ReadTransactionRequest.class);
+ Assert.assertEquals(request.getTarget(), received.getTarget());
+ Assert.assertEquals(request.getPath(), received.getPath());
+ }
+
+ @Test
+ public void testForwardToRemoteModifyExists() throws Exception {
+ final TestProbe probe = createProbe();
+ final ExistsTransactionRequest request =
+ new ExistsTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_1, false);
+ final ExistsTransactionRequest received = testForwardToRemote(request, ExistsTransactionRequest.class);
+ Assert.assertEquals(request.getTarget(), received.getTarget());
+ Assert.assertEquals(request.getPath(), received.getPath());
+ }
+
+ @Test
+ public void testForwardToRemoteModifyPreCommit() throws Exception {
+ final TestProbe probe = createProbe();
+ final TransactionPreCommitRequest request =
+ new TransactionPreCommitRequest(TRANSACTION_ID, 0L, probe.ref());
+ final TransactionPreCommitRequest received = testForwardToRemote(request, TransactionPreCommitRequest.class);
+ Assert.assertEquals(request.getTarget(), received.getTarget());
+ }
+
+ @Test
+ public void testForwardToRemoteModifyDoCommit() throws Exception {
+ final TestProbe probe = createProbe();
+ final TransactionDoCommitRequest request =
+ new TransactionDoCommitRequest(TRANSACTION_ID, 0L, probe.ref());
+ final TransactionDoCommitRequest received = testForwardToRemote(request, TransactionDoCommitRequest.class);
+ Assert.assertEquals(request.getTarget(), received.getTarget());
+ }
+
+
+ private <T extends TransactionModification> void testModification(final Runnable modification,
+ final Class<T> cls,
+ final YangInstanceIdentifier expectedPath) {
+ modification.run();
+ final ModifyTransactionRequest request = transaction.commitRequest(false);
+ final List<TransactionModification> modifications = request.getModifications();
+ Assert.assertEquals(1, modifications.size());
+ Assert.assertThat(modifications, hasItem(both(isA(cls)).and(hasPath(expectedPath))));
+ }
+
+}
\ No newline at end of file
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.concepts.AbstractRequestFailureProxy;
import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.Request;
import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
/**
* Helper class. Allows checking messages received by backend and respond to them.
*/
-class TranasactionTester {
+class TransactionTester<T extends AbstractProxyTransaction> {
- private final RemoteProxyTransaction transaction;
+ private final T transaction;
private final AbstractClientConnection<ShardBackendInfo> connection;
private final TestProbe backendProbe;
private RequestEnvelope envelope;
- TranasactionTester(final RemoteProxyTransaction transaction,
- final AbstractClientConnection<ShardBackendInfo> connection,
- final TestProbe backendProbe) {
+ TransactionTester(final T transaction,
+ final AbstractClientConnection<ShardBackendInfo> connection,
+ final TestProbe backendProbe) {
this.transaction = transaction;
this.connection = connection;
this.backendProbe = backendProbe;
}
- RemoteProxyTransaction getTransaction() {
+ T getTransaction() {
return transaction;
}
return (TransactionRequest) envelope.getMessage();
}
- <T extends TransactionRequest> T expectTransactionRequest(final Class<T> expected) {
+ <R extends TransactionRequest> R expectTransactionRequest(final Class<R> expected) {
envelope = backendProbe.expectMsgClass(RequestEnvelope.class);
- Assert.assertTrue(expected.isAssignableFrom(envelope.getMessage().getClass()));
- return (T) envelope.getMessage();
+ final Class<? extends Request> actual = envelope.getMessage().getClass();
+ final String errorMsg = String.format("Expected instance of %s, received %s", expected, actual);
+ Assert.assertTrue(errorMsg, expected.isAssignableFrom(actual));
+ return (R) envelope.getMessage();
}
void replySuccess(final RequestSuccess<?, ?> success) {