Slice front-end request messages
[controller.git] / opendaylight / md-sal / cds-access-client / src / test / java / org / opendaylight / controller / cluster / access / client / TransmittingTransmitQueueTest.java
index f7ea931365e819ab0529f6ada509be54c428ee32..b6636553ffa661b9b4dd48411d61f75f7090a76c 100644 (file)
@@ -12,8 +12,12 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest;
 
 import com.google.common.base.Ticker;
@@ -27,7 +31,9 @@ import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
 import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
@@ -38,10 +44,13 @@ import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.messaging.MessageSlicer;
+import org.opendaylight.controller.cluster.messaging.SliceOptions;
 
 public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<TransmitQueue.Transmitting> {
 
     private BackendInfo backendInfo;
+    private final MessageSlicer mockMessageSlicer = mock(MessageSlicer.class);
 
     private static long now() {
         return Ticker.systemTicker().read();
@@ -54,8 +63,9 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
 
     @Override
     protected TransmitQueue.Transmitting createQueue() {
+        doReturn(false).when(mockMessageSlicer).slice(any());
         backendInfo = new BackendInfo(probe.ref(), 0L, ABIVersion.BORON, 3);
-        return new TransmitQueue.Transmitting(new TransmitQueue.Halted(0), 0, backendInfo, now());
+        return new TransmitQueue.Transmitting(new TransmitQueue.Halted(0), 0, backendInfo, now(), mockMessageSlicer);
     }
 
     @Test
@@ -133,9 +143,18 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = now();
         final ConnectionEntry entry = new ConnectionEntry(request, callback, now);
-        queue.transmit(entry, now);
+
+        Optional<TransmittedConnectionEntry> transmitted = queue.transmit(entry, now);
+        assertTrue(transmitted.isPresent());
+        assertEquals(request, transmitted.get().getRequest());
+        assertEquals(callback, transmitted.get().getCallback());
+
         final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
         assertEquals(request, requestEnvelope.getMessage());
+
+        transmitted = queue.transmit(new ConnectionEntry(new TransactionPurgeRequest(
+                TRANSACTION_IDENTIFIER, 1L, probe.ref()), callback, now), now);
+        assertTrue(transmitted.isPresent());
     }
 
     @Test
@@ -202,6 +221,89 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
         assertEqualRequests(queue.getPending(), req6);
     }
 
+    @Test
+    public void testRequestSlicingOnTransmit() throws Exception {
+        doReturn(true).when(mockMessageSlicer).slice(any());
+
+        ModifyTransactionRequestBuilder reqBuilder = new ModifyTransactionRequestBuilder(
+                TRANSACTION_IDENTIFIER, probe.ref());
+        reqBuilder.setSequence(0L);
+        final Request<?, ?> request = reqBuilder.build();
+
+        final long now = now();
+        final Consumer<Response<?, ?>> mockConsumer = createConsumerMock();
+        Optional<TransmittedConnectionEntry> transmitted =
+                queue.transmit(new ConnectionEntry(request, mockConsumer, now), now);
+        assertTrue(transmitted.isPresent());
+
+        ArgumentCaptor<SliceOptions> sliceOptions = ArgumentCaptor.forClass(SliceOptions.class);
+        verify(mockMessageSlicer).slice(sliceOptions.capture());
+        assertTrue(sliceOptions.getValue().getMessage() instanceof RequestEnvelope);
+        RequestEnvelope requestEnvelope = (RequestEnvelope) sliceOptions.getValue().getMessage();
+        assertEquals(request, requestEnvelope.getMessage());
+
+        final Request<?, ?> request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
+        transmitted = queue.transmit(new ConnectionEntry(request2, mockConsumer, now), now);
+        assertFalse(transmitted.isPresent());
+    }
+
+    @Test
+    public void testSlicingFailureOnTransmit() throws Exception {
+        doAnswer(invocation -> {
+            invocation.getArgumentAt(0, SliceOptions.class).getOnFailureCallback().accept(new Exception("mock"));
+            return Boolean.FALSE;
+        }).when(mockMessageSlicer).slice(any());
+
+        ModifyTransactionRequestBuilder reqBuilder = new ModifyTransactionRequestBuilder(
+                TRANSACTION_IDENTIFIER, probe.ref());
+        reqBuilder.setSequence(0L);
+
+        final long now = now();
+        Optional<TransmittedConnectionEntry> transmitted =
+                queue.transmit(new ConnectionEntry(reqBuilder.build(), createConsumerMock(), now), now);
+        assertTrue(transmitted.isPresent());
+
+        verify(mockMessageSlicer).slice(any());
+
+        probe.expectMsgClass(FailureEnvelope.class);
+    }
+
+    @Test
+    public void testSlicedRequestOnComplete() throws Exception {
+        doReturn(true).when(mockMessageSlicer).slice(any());
+
+        ModifyTransactionRequestBuilder reqBuilder = new ModifyTransactionRequestBuilder(
+                TRANSACTION_IDENTIFIER, probe.ref());
+        reqBuilder.setSequence(0L);
+        final Request<?, ?> request = reqBuilder.build();
+
+        final long now = now();
+        final Consumer<Response<?, ?>> mockConsumer = createConsumerMock();
+        queue.enqueueOrForward(new ConnectionEntry(request, mockConsumer, now), now);
+
+        ArgumentCaptor<SliceOptions> sliceOptions = ArgumentCaptor.forClass(SliceOptions.class);
+        verify(mockMessageSlicer).slice(sliceOptions.capture());
+        assertTrue(sliceOptions.getValue().getMessage() instanceof RequestEnvelope);
+
+        final Request<?, ?> request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
+        queue.enqueueOrForward(new ConnectionEntry(request2, mockConsumer, now), now);
+        verifyNoMoreInteractions(mockMessageSlicer);
+        probe.expectNoMsg();
+
+        RequestEnvelope requestEnvelope = (RequestEnvelope) sliceOptions.getValue().getMessage();
+        queue.complete(new FailureEnvelope(request.toRequestFailure(mock(RequestException.class)),
+                requestEnvelope.getSessionId(), requestEnvelope.getTxSequence(), 0), 0);
+
+        requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
+        assertEquals(request2, requestEnvelope.getMessage());
+
+        final Request<?, ?> request3 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 3L, probe.ref());
+        queue.enqueueOrForward(new ConnectionEntry(request3, mockConsumer, now), now);
+
+        requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
+        assertEquals(request3, requestEnvelope.getMessage());
+    }
+
     private static void assertEqualRequests(final Collection<? extends ConnectionEntry> queue,
             final Request<?, ?>... requests) {
         final List<Request<?, ?>> queued = ImmutableList.copyOf(Collections2.transform(queue,