import static org.mockito.Mockito.when;
import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
import akka.testkit.TestProbe;
+import akka.testkit.javadsl.TestKit;
+import com.google.common.base.Ticker;
import com.google.common.primitives.UnsignedLong;
import java.util.ArrayList;
import java.util.List;
import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
private TestProbe backendProbe;
private TestProbe clientContextProbe;
private TransactionTester<T> tester;
+ protected ClientActorContext context;
protected T transaction;
@Before
system = ActorSystem.apply();
clientContextProbe = new TestProbe(system, "clientContext");
backendProbe = new TestProbe(system, "backend");
- final ClientActorContext context =
- AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
+ context = AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID,
+ PERSISTENCE_ID);
final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
"default", UnsignedLong.ZERO, Optional.empty(), 3);
final AbstractClientConnection<ShardBackendInfo> connection =
tester = new TransactionTester<>(transaction, connection, backendProbe);
}
+ @SuppressWarnings("checkstyle:hiddenField")
protected abstract T createTransaction(ProxyHistory parent, TransactionIdentifier id, DataTreeSnapshot snapshot);
@After
public void tearDown() throws Exception {
- JavaTestKit.shutdownActorSystem(system);
+ TestKit.shutdownActorSystem(system);
}
@Test
final List<ConnectionEntry> entries = new ArrayList<>();
final Consumer<Response<?, ?>> callback = createCallbackMock();
final ReadTransactionRequest request1 =
- new ReadTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_2, true);
+ new ReadTransactionRequest(TRANSACTION_ID, 2L, probe.ref(), PATH_2, true);
final ExistsTransactionRequest request2 =
- new ExistsTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_3, true);
+ new ExistsTransactionRequest(TRANSACTION_ID, 3L, probe.ref(), PATH_3, true);
entries.add(AccessClientUtil.createConnectionEntry(request1, callback, 0L));
entries.add(AccessClientUtil.createConnectionEntry(request2, callback, 0L));
final TransactionTester<RemoteProxyTransaction> successor = createRemoteProxyTransactionTester();
final AbortLocalTransactionRequest successful1 = new AbortLocalTransactionRequest(TRANSACTION_ID, probe.ref());
transaction.recordSuccessfulRequest(successful1);
final ReadTransactionRequest successful2 =
- new ReadTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_1, true);
+ new ReadTransactionRequest(TRANSACTION_ID, 1L, probe.ref(), PATH_1, true);
transaction.recordSuccessfulRequest(successful2);
transaction.startReconnect();
- transaction.replayMessages(successor.getTransaction(), entries);
- Assert.assertEquals(successful1, successor.expectTransactionRequest(AbortLocalTransactionRequest.class));
- Assert.assertEquals(successful2, successor.expectTransactionRequest(ReadTransactionRequest.class));
- Assert.assertEquals(request1, successor.expectTransactionRequest(ReadTransactionRequest.class));
- Assert.assertEquals(request2, successor.expectTransactionRequest(ExistsTransactionRequest.class));
+
+ final ProxyHistory mockSuccessor = mock(ProxyHistory.class);
+ when(mockSuccessor.createTransactionProxy(TRANSACTION_ID, transaction.isSnapshotOnly(), false))
+ .thenReturn(successor.getTransaction());
+
+ transaction.replayMessages(mockSuccessor, entries);
+
+ final ModifyTransactionRequest transformed = successor.expectTransactionRequest(ModifyTransactionRequest.class);
+ Assert.assertNotNull(transformed);
+ Assert.assertEquals(successful1.getSequence(), transformed.getSequence());
+ Assert.assertTrue(transformed.getPersistenceProtocol().isPresent());
+ Assert.assertEquals(PersistenceProtocol.ABORT, transformed.getPersistenceProtocol().get());
+
+ ReadTransactionRequest tmpRead = successor.expectTransactionRequest(ReadTransactionRequest.class);
+ Assert.assertNotNull(tmpRead);
+ Assert.assertEquals(successful2.getTarget(), tmpRead.getTarget());
+ Assert.assertEquals(successful2.getSequence(), tmpRead.getSequence());
+ Assert.assertEquals(successful2.getPath(), tmpRead.getPath());
+ Assert.assertEquals(successor.localActor(), tmpRead.getReplyTo());
+
+ tmpRead = successor.expectTransactionRequest(ReadTransactionRequest.class);
+ Assert.assertNotNull(tmpRead);
+ Assert.assertEquals(request1.getTarget(), tmpRead.getTarget());
+ Assert.assertEquals(request1.getSequence(), tmpRead.getSequence());
+ Assert.assertEquals(request1.getPath(), tmpRead.getPath());
+ Assert.assertEquals(successor.localActor(), tmpRead.getReplyTo());
+
+ final ExistsTransactionRequest tmpExist = successor.expectTransactionRequest(ExistsTransactionRequest.class);
+ Assert.assertNotNull(tmpExist);
+ Assert.assertEquals(request2.getTarget(), tmpExist.getTarget());
+ Assert.assertEquals(request2.getSequence(), tmpExist.getSequence());
+ Assert.assertEquals(request2.getPath(), tmpExist.getPath());
+ Assert.assertEquals(successor.localActor(), tmpExist.getReplyTo());
}
protected void checkModifications(final ModifyTransactionRequest modifyRequest) {
final List<TransactionModification> modifications = modifyRequest.getModifications();
Assert.assertEquals(3, modifications.size());
- Assert.assertThat(modifications, hasItem(both(isA(TransactionWrite.class)).and((hasPath(PATH_1)))));
- Assert.assertThat(modifications, hasItem(both(isA(TransactionMerge.class)).and((hasPath(PATH_2)))));
- Assert.assertThat(modifications, hasItem(both(isA(TransactionDelete.class)).and((hasPath(PATH_3)))));
+ Assert.assertThat(modifications, hasItem(both(isA(TransactionWrite.class)).and(hasPath(PATH_1))));
+ Assert.assertThat(modifications, hasItem(both(isA(TransactionMerge.class)).and(hasPath(PATH_2))));
+ Assert.assertThat(modifications, hasItem(both(isA(TransactionDelete.class)).and(hasPath(PATH_3))));
}
- protected void testRequestResponse(final Consumer<VotingFuture> consumer,
- final Class<? extends TransactionRequest> expectedRequest,
- final BiFunction<TransactionIdentifier, Long, TransactionSuccess> replySupplier)
- throws Exception {
+ @SuppressWarnings("checkstyle:hiddenField")
+ protected <R extends TransactionRequest<R>> void testRequestResponse(final Consumer<VotingFuture<Void>> consumer,
+ final Class<R> expectedRequest,
+ final BiFunction<TransactionIdentifier, Long, TransactionSuccess<?>> replySupplier) throws Exception {
final TransactionTester<T> tester = getTester();
- final VotingFuture future = mock(VotingFuture.class);
+ final VotingFuture<Void> future = mock(VotingFuture.class);
transaction.seal();
consumer.accept(future);
- final TransactionRequest req = tester.expectTransactionRequest(expectedRequest);
+ final TransactionRequest<?> req = tester.expectTransactionRequest(expectedRequest);
tester.replySuccess(replySupplier.apply(TRANSACTION_ID, req.getSequence()));
verify(future).voteYes();
}
- protected <T extends TransactionRequest> T testHandleForwardedRemoteRequest(final T request) throws Exception {
- transaction.handleForwardedRemoteRequest(request, createCallbackMock());
+ protected <R extends TransactionRequest<R>> R testHandleForwardedRemoteRequest(final R request) throws Exception {
+ transaction.handleReplayedRemoteRequest(request, createCallbackMock(), Ticker.systemTicker().read());
final RequestEnvelope envelope = backendProbe.expectMsgClass(RequestEnvelope.class);
- final T received = (T) envelope.getMessage();
+ final R received = (R) envelope.getMessage();
Assert.assertTrue(received.getClass().equals(request.getClass()));
Assert.assertEquals(TRANSACTION_ID, received.getTarget());
Assert.assertEquals(clientContextProbe.ref(), received.getReplyTo());
return received;
}
- protected <T extends TransactionRequest> T testForwardToRemote(final TransactionRequest toForward,
- final Class<T> expectedMessageClass) {
+ protected <R extends TransactionRequest<R>> R testForwardToRemote(final TransactionRequest<?> toForward,
+ final Class<R> expectedMessageClass) {
final Consumer<Response<?, ?>> callback = createCallbackMock();
final TransactionTester<RemoteProxyTransaction> transactionTester = createRemoteProxyTransactionTester();
final RemoteProxyTransaction successor = transactionTester.getTransaction();
}
@SuppressWarnings("unchecked")
- protected <T> Consumer<T> createCallbackMock() {
- return (Consumer<T>) mock(Consumer.class);
+ protected static <T> Consumer<T> createCallbackMock() {
+ return mock(Consumer.class);
}
protected static BaseMatcher<TransactionModification> hasPath(final YangInstanceIdentifier path) {
return new TestProbe(system);
}
+ @SuppressWarnings("checkstyle:hiddenField")
protected TransactionTester<LocalReadWriteProxyTransaction> createLocalProxy() {
final TestProbe backendProbe = new TestProbe(system, "backend2");
final TestProbe clientContextProbe = new TestProbe(system, "clientContext2");
return new TransactionTester<>(tx, connection, backendProbe);
}
+ @SuppressWarnings("checkstyle:hiddenField")
protected TransactionTester<RemoteProxyTransaction> createRemoteProxyTransactionTester() {
final TestProbe clientContextProbe = new TestProbe(system, "remoteClientContext");
final TestProbe backendProbe = new TestProbe(system, "remoteBackend");
AccessClientUtil.createConnectedConnection(context, 0L, backend);
final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID);
final RemoteProxyTransaction transaction =
- new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false);
+ new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false, false);
return new TransactionTester<>(transaction, connection, backendProbe);
}
}