import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest;
import com.google.common.base.Ticker;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.messaging.MessageSlicer;
+import org.opendaylight.controller.cluster.messaging.SliceOptions;
public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<TransmitQueue.Transmitting> {
private BackendInfo backendInfo;
+ private final MessageSlicer mockMessageSlicer = mock(MessageSlicer.class);
private static long now() {
return Ticker.systemTicker().read();
@Override
protected TransmitQueue.Transmitting createQueue() {
+ doReturn(false).when(mockMessageSlicer).slice(any());
backendInfo = new BackendInfo(probe.ref(), 0L, ABIVersion.BORON, 3);
- return new TransmitQueue.Transmitting(new TransmitQueue.Halted(0), 0, backendInfo, now());
+ return new TransmitQueue.Transmitting(new TransmitQueue.Halted(0), 0, backendInfo, now(), mockMessageSlicer);
}
@Test
final Consumer<Response<?, ?>> callback = createConsumerMock();
final long now = now();
final ConnectionEntry entry = new ConnectionEntry(request, callback, now);
- queue.transmit(entry, now);
+
+ Optional<TransmittedConnectionEntry> transmitted = queue.transmit(entry, now);
+ assertTrue(transmitted.isPresent());
+ assertEquals(request, transmitted.get().getRequest());
+ assertEquals(callback, transmitted.get().getCallback());
+
final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
assertEquals(request, requestEnvelope.getMessage());
+
+ transmitted = queue.transmit(new ConnectionEntry(new TransactionPurgeRequest(
+ TRANSACTION_IDENTIFIER, 1L, probe.ref()), callback, now), now);
+ assertTrue(transmitted.isPresent());
}
@Test
assertEqualRequests(queue.getPending(), req6);
}
+ @Test
+ public void testRequestSlicingOnTransmit() throws Exception {
+ doReturn(true).when(mockMessageSlicer).slice(any());
+
+ ModifyTransactionRequestBuilder reqBuilder = new ModifyTransactionRequestBuilder(
+ TRANSACTION_IDENTIFIER, probe.ref());
+ reqBuilder.setSequence(0L);
+ final Request<?, ?> request = reqBuilder.build();
+
+ final long now = now();
+ final Consumer<Response<?, ?>> mockConsumer = createConsumerMock();
+ Optional<TransmittedConnectionEntry> transmitted =
+ queue.transmit(new ConnectionEntry(request, mockConsumer, now), now);
+ assertTrue(transmitted.isPresent());
+
+ ArgumentCaptor<SliceOptions> sliceOptions = ArgumentCaptor.forClass(SliceOptions.class);
+ verify(mockMessageSlicer).slice(sliceOptions.capture());
+ assertTrue(sliceOptions.getValue().getMessage() instanceof RequestEnvelope);
+ RequestEnvelope requestEnvelope = (RequestEnvelope) sliceOptions.getValue().getMessage();
+ assertEquals(request, requestEnvelope.getMessage());
+
+ final Request<?, ?> request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
+ transmitted = queue.transmit(new ConnectionEntry(request2, mockConsumer, now), now);
+ assertFalse(transmitted.isPresent());
+ }
+
+ @Test
+ public void testSlicingFailureOnTransmit() throws Exception {
+ doAnswer(invocation -> {
+ invocation.getArgumentAt(0, SliceOptions.class).getOnFailureCallback().accept(new Exception("mock"));
+ return Boolean.FALSE;
+ }).when(mockMessageSlicer).slice(any());
+
+ ModifyTransactionRequestBuilder reqBuilder = new ModifyTransactionRequestBuilder(
+ TRANSACTION_IDENTIFIER, probe.ref());
+ reqBuilder.setSequence(0L);
+
+ final long now = now();
+ Optional<TransmittedConnectionEntry> transmitted =
+ queue.transmit(new ConnectionEntry(reqBuilder.build(), createConsumerMock(), now), now);
+ assertTrue(transmitted.isPresent());
+
+ verify(mockMessageSlicer).slice(any());
+
+ probe.expectMsgClass(FailureEnvelope.class);
+ }
+
+ @Test
+ public void testSlicedRequestOnComplete() throws Exception {
+ doReturn(true).when(mockMessageSlicer).slice(any());
+
+ ModifyTransactionRequestBuilder reqBuilder = new ModifyTransactionRequestBuilder(
+ TRANSACTION_IDENTIFIER, probe.ref());
+ reqBuilder.setSequence(0L);
+ final Request<?, ?> request = reqBuilder.build();
+
+ final long now = now();
+ final Consumer<Response<?, ?>> mockConsumer = createConsumerMock();
+ queue.enqueueOrForward(new ConnectionEntry(request, mockConsumer, now), now);
+
+ ArgumentCaptor<SliceOptions> sliceOptions = ArgumentCaptor.forClass(SliceOptions.class);
+ verify(mockMessageSlicer).slice(sliceOptions.capture());
+ assertTrue(sliceOptions.getValue().getMessage() instanceof RequestEnvelope);
+
+ final Request<?, ?> request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
+ queue.enqueueOrForward(new ConnectionEntry(request2, mockConsumer, now), now);
+ verifyNoMoreInteractions(mockMessageSlicer);
+ probe.expectNoMsg();
+
+ RequestEnvelope requestEnvelope = (RequestEnvelope) sliceOptions.getValue().getMessage();
+ queue.complete(new FailureEnvelope(request.toRequestFailure(mock(RequestException.class)),
+ requestEnvelope.getSessionId(), requestEnvelope.getTxSequence(), 0), 0);
+
+ requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
+ assertEquals(request2, requestEnvelope.getMessage());
+
+ final Request<?, ?> request3 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 3L, probe.ref());
+ queue.enqueueOrForward(new ConnectionEntry(request3, mockConsumer, now), now);
+
+ requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
+ assertEquals(request3, requestEnvelope.getMessage());
+ }
+
private static void assertEqualRequests(final Collection<? extends ConnectionEntry> queue,
final Request<?, ?>... requests) {
final List<Request<?, ?>> queued = ImmutableList.copyOf(Collections2.transform(queue,