import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import akka.testkit.TestProbe;
+import com.google.common.base.Ticker;
import com.google.common.primitives.UnsignedLong;
import java.util.ArrayList;
import java.util.List;
final List<ConnectionEntry> entries = new ArrayList<>();
final Consumer<Response<?, ?>> callback = createCallbackMock();
final ReadTransactionRequest request1 =
- new ReadTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_2, true);
+ new ReadTransactionRequest(TRANSACTION_ID, 2L, probe.ref(), PATH_2, true);
final ExistsTransactionRequest request2 =
- new ExistsTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_3, true);
+ new ExistsTransactionRequest(TRANSACTION_ID, 3L, probe.ref(), PATH_3, true);
entries.add(AccessClientUtil.createConnectionEntry(request1, callback, 0L));
entries.add(AccessClientUtil.createConnectionEntry(request2, callback, 0L));
final TransactionTester<RemoteProxyTransaction> successor = createRemoteProxyTransactionTester();
final AbortLocalTransactionRequest successful1 = new AbortLocalTransactionRequest(TRANSACTION_ID, probe.ref());
transaction.recordSuccessfulRequest(successful1);
final ReadTransactionRequest successful2 =
- new ReadTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_1, true);
+ new ReadTransactionRequest(TRANSACTION_ID, 1L, probe.ref(), PATH_1, true);
transaction.recordSuccessfulRequest(successful2);
transaction.startReconnect();
transaction.replayMessages(successor.getTransaction(), entries);
Assert.assertEquals(successful1.getSequence(), transformed.getSequence());
Assert.assertTrue(transformed.getPersistenceProtocol().isPresent());
Assert.assertEquals(PersistenceProtocol.ABORT, transformed.getPersistenceProtocol().get());
- Assert.assertEquals(successful2, successor.expectTransactionRequest(ReadTransactionRequest.class));
- Assert.assertEquals(request1, successor.expectTransactionRequest(ReadTransactionRequest.class));
- Assert.assertEquals(request2, successor.expectTransactionRequest(ExistsTransactionRequest.class));
+
+ ReadTransactionRequest tmpRead = successor.expectTransactionRequest(ReadTransactionRequest.class);
+ Assert.assertNotNull(tmpRead);
+ Assert.assertEquals(successful2.getTarget(), tmpRead.getTarget());
+ Assert.assertEquals(successful2.getSequence(), tmpRead.getSequence());
+ Assert.assertEquals(successful2.getPath(), tmpRead.getPath());
+ Assert.assertEquals(successor.localActor(), tmpRead.getReplyTo());
+
+ tmpRead = successor.expectTransactionRequest(ReadTransactionRequest.class);
+ Assert.assertNotNull(tmpRead);
+ Assert.assertEquals(request1.getTarget(), tmpRead.getTarget());
+ Assert.assertEquals(request1.getSequence(), tmpRead.getSequence());
+ Assert.assertEquals(request1.getPath(), tmpRead.getPath());
+ Assert.assertEquals(successor.localActor(), tmpRead.getReplyTo());
+
+ final ExistsTransactionRequest tmpExist = successor.expectTransactionRequest(ExistsTransactionRequest.class);
+ Assert.assertNotNull(tmpExist);
+ Assert.assertEquals(request2.getTarget(), tmpExist.getTarget());
+ Assert.assertEquals(request2.getSequence(), tmpExist.getSequence());
+ Assert.assertEquals(request2.getPath(), tmpExist.getPath());
+ Assert.assertEquals(successor.localActor(), tmpExist.getReplyTo());
}
protected void checkModifications(final ModifyTransactionRequest modifyRequest) {
}
protected <T extends TransactionRequest<?>> T testHandleForwardedRemoteRequest(final T request) throws Exception {
- transaction.handleForwardedRemoteRequest(request, createCallbackMock());
+ transaction.handleReplayedRemoteRequest(request, createCallbackMock(), Ticker.systemTicker().read());
final RequestEnvelope envelope = backendProbe.expectMsgClass(RequestEnvelope.class);
final T received = (T) envelope.getMessage();
Assert.assertTrue(received.getClass().equals(request.getClass()));