+ @Test
+ public void testRequestSlicingOnTransmit() throws Exception {
+ doReturn(true).when(mockMessageSlicer).slice(any());
+
+ ModifyTransactionRequestBuilder reqBuilder = new ModifyTransactionRequestBuilder(
+ TRANSACTION_IDENTIFIER, probe.ref());
+ reqBuilder.setSequence(0L);
+ final Request<?, ?> request = reqBuilder.build();
+
+ final long now = now();
+ final Consumer<Response<?, ?>> mockConsumer = createConsumerMock();
+ Optional<TransmittedConnectionEntry> transmitted =
+ queue.transmit(new ConnectionEntry(request, mockConsumer, now), now);
+ assertTrue(transmitted.isPresent());
+
+ ArgumentCaptor<SliceOptions> sliceOptions = ArgumentCaptor.forClass(SliceOptions.class);
+ verify(mockMessageSlicer).slice(sliceOptions.capture());
+ assertTrue(sliceOptions.getValue().getMessage() instanceof RequestEnvelope);
+ RequestEnvelope requestEnvelope = (RequestEnvelope) sliceOptions.getValue().getMessage();
+ assertEquals(request, requestEnvelope.getMessage());
+
+ final Request<?, ?> request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
+ transmitted = queue.transmit(new ConnectionEntry(request2, mockConsumer, now), now);
+ assertFalse(transmitted.isPresent());
+ }
+
+ @Test
+ public void testSlicingFailureOnTransmit() throws Exception {
+ doAnswer(invocation -> {
+ invocation.getArgumentAt(0, SliceOptions.class).getOnFailureCallback().accept(new Exception("mock"));
+ return Boolean.FALSE;
+ }).when(mockMessageSlicer).slice(any());
+
+ ModifyTransactionRequestBuilder reqBuilder = new ModifyTransactionRequestBuilder(
+ TRANSACTION_IDENTIFIER, probe.ref());
+ reqBuilder.setSequence(0L);
+
+ final long now = now();
+ Optional<TransmittedConnectionEntry> transmitted =
+ queue.transmit(new ConnectionEntry(reqBuilder.build(), createConsumerMock(), now), now);
+ assertTrue(transmitted.isPresent());
+
+ verify(mockMessageSlicer).slice(any());
+
+ probe.expectMsgClass(FailureEnvelope.class);
+ }
+
+ @Test
+ public void testSlicedRequestOnComplete() throws Exception {
+ doReturn(true).when(mockMessageSlicer).slice(any());
+
+ ModifyTransactionRequestBuilder reqBuilder = new ModifyTransactionRequestBuilder(
+ TRANSACTION_IDENTIFIER, probe.ref());
+ reqBuilder.setSequence(0L);
+ final Request<?, ?> request = reqBuilder.build();
+
+ final long now = now();
+ final Consumer<Response<?, ?>> mockConsumer = createConsumerMock();
+ queue.enqueueOrForward(new ConnectionEntry(request, mockConsumer, now), now);
+
+ ArgumentCaptor<SliceOptions> sliceOptions = ArgumentCaptor.forClass(SliceOptions.class);
+ verify(mockMessageSlicer).slice(sliceOptions.capture());
+ assertTrue(sliceOptions.getValue().getMessage() instanceof RequestEnvelope);
+
+ final Request<?, ?> request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
+ queue.enqueueOrForward(new ConnectionEntry(request2, mockConsumer, now), now);
+ verifyNoMoreInteractions(mockMessageSlicer);
+ probe.expectNoMsg();
+
+ RequestEnvelope requestEnvelope = (RequestEnvelope) sliceOptions.getValue().getMessage();
+ queue.complete(new FailureEnvelope(request.toRequestFailure(mock(RequestException.class)),
+ requestEnvelope.getSessionId(), requestEnvelope.getTxSequence(), 0), 0);
+
+ requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
+ assertEquals(request2, requestEnvelope.getMessage());
+
+ final Request<?, ?> request3 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 3L, probe.ref());
+ queue.enqueueOrForward(new ConnectionEntry(request3, mockConsumer, now), now);
+
+ requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
+ assertEquals(request3, requestEnvelope.getMessage());
+ }
+