X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fcds-access-client%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Faccess%2Fclient%2FTransmittingTransmitQueueTest.java;h=8db4803c43c9bec7b43c94b6ee7ec100d163a41e;hb=refs%2Fchanges%2F39%2F74339%2F5;hp=11fc421903b77c522ed3c0d0dffd5e9e6cff3ca0;hpb=dafc95d149bc62f101de37e94b9b5e3526d4e87b;p=controller.git diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java index 11fc421903..8db4803c43 100644 --- a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java +++ b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java @@ -12,8 +12,12 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest; import com.google.common.base.Ticker; @@ -27,7 +31,9 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.opendaylight.controller.cluster.access.ABIVersion; +import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder; import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest; import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse; import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope; @@ -38,10 +44,13 @@ import org.opendaylight.controller.cluster.access.concepts.RequestSuccess; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.messaging.MessageSlicer; +import org.opendaylight.controller.cluster.messaging.SliceOptions; public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest { private BackendInfo backendInfo; + private final MessageSlicer mockMessageSlicer = mock(MessageSlicer.class); private static long now() { return Ticker.systemTicker().read(); @@ -54,12 +63,13 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, sequence1, probe.ref()); @@ -70,8 +80,8 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest success1 = new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, sequence1); final RequestSuccess success2 = new TransactionPurgeResponse(transactionIdentifier2, sequence2); //complete entries in different order @@ -92,23 +102,23 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); final Consumer> callback = createConsumerMock(); final long now = now(); - queue.enqueue(new ConnectionEntry(request, callback, now), now); + queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now); final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class); assertEquals(request, requestEnvelope.getMessage()); } @Test - public void testEnqueueBackendFull() throws Exception { + public void testEnqueueBackendFull() { final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); final Consumer> callback = createConsumerMock(); final long now = now(); final int sentMessages = getMaxInFlightMessages() + 1; for (int i = 0; i < sentMessages; i++) { - queue.enqueue(new ConnectionEntry(request, callback, now), now); + queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now); } for (int i = 0; i < getMaxInFlightMessages(); i++) { probe.expectMsgClass(RequestEnvelope.class); @@ -121,25 +131,34 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest 0); assertFalse(queue.canTransmitCount(getMaxInFlightMessages()) > 0); } @Test @Override - public void testTransmit() throws Exception { + public void testTransmit() { final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); final Consumer> callback = createConsumerMock(); final long now = now(); final ConnectionEntry entry = new ConnectionEntry(request, callback, now); - queue.transmit(entry, now); + + Optional transmitted = queue.transmit(entry, now); + assertTrue(transmitted.isPresent()); + assertEquals(request, transmitted.get().getRequest()); + assertEquals(callback, transmitted.get().getCallback()); + final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class); assertEquals(request, requestEnvelope.getMessage()); + + transmitted = queue.transmit(new ConnectionEntry(new TransactionPurgeRequest( + TRANSACTION_IDENTIFIER, 1L, probe.ref()), callback, now), now); + assertTrue(transmitted.isPresent()); } @Test - public void testSetForwarder() throws Exception { + public void testSetForwarder() { final FakeTicker ticker = new FakeTicker(); ticker.setAutoIncrementStep(1, TimeUnit.MICROSECONDS); final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); @@ -148,7 +167,7 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest> callback = createConsumerMock(); // Fill the queue up to capacity + 1 - queue.enqueue(new ConnectionEntry(req0, callback, 0), 0); - queue.enqueue(new ConnectionEntry(req1, callback, 0), 0); - queue.enqueue(new ConnectionEntry(req2, callback, 0), 0); - queue.enqueue(new ConnectionEntry(req3, callback, 0), 0); + queue.enqueueOrForward(new ConnectionEntry(req0, callback, 0), 0); + queue.enqueueOrForward(new ConnectionEntry(req1, callback, 0), 0); + queue.enqueueOrForward(new ConnectionEntry(req2, callback, 0), 0); + queue.enqueueOrForward(new ConnectionEntry(req3, callback, 0), 0); assertEqualRequests(queue.getInflight(), req0, req1, req2); assertEqualRequests(queue.getPending(), req3); @@ -182,12 +201,12 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest request = reqBuilder.build(); + + final long now = now(); + final Consumer> mockConsumer = createConsumerMock(); + Optional transmitted = + queue.transmit(new ConnectionEntry(request, mockConsumer, now), now); + assertTrue(transmitted.isPresent()); + + ArgumentCaptor sliceOptions = ArgumentCaptor.forClass(SliceOptions.class); + verify(mockMessageSlicer).slice(sliceOptions.capture()); + assertTrue(sliceOptions.getValue().getMessage() instanceof RequestEnvelope); + RequestEnvelope requestEnvelope = (RequestEnvelope) sliceOptions.getValue().getMessage(); + assertEquals(request, requestEnvelope.getMessage()); + + final Request request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref()); + transmitted = queue.transmit(new ConnectionEntry(request2, mockConsumer, now), now); + assertFalse(transmitted.isPresent()); + } + + @Test + public void testSlicingFailureOnTransmit() { + doAnswer(invocation -> { + invocation.getArgumentAt(0, SliceOptions.class).getOnFailureCallback().accept(new Exception("mock")); + return Boolean.FALSE; + }).when(mockMessageSlicer).slice(any()); + + ModifyTransactionRequestBuilder reqBuilder = new ModifyTransactionRequestBuilder( + TRANSACTION_IDENTIFIER, probe.ref()); + reqBuilder.setSequence(0L); + + final long now = now(); + Optional transmitted = + queue.transmit(new ConnectionEntry(reqBuilder.build(), createConsumerMock(), now), now); + assertTrue(transmitted.isPresent()); + + verify(mockMessageSlicer).slice(any()); + + probe.expectMsgClass(FailureEnvelope.class); + } + + @Test + public void testSlicedRequestOnComplete() { + doReturn(true).when(mockMessageSlicer).slice(any()); + + ModifyTransactionRequestBuilder reqBuilder = new ModifyTransactionRequestBuilder( + TRANSACTION_IDENTIFIER, probe.ref()); + reqBuilder.setSequence(0L); + final Request request = reqBuilder.build(); + + final long now = now(); + final Consumer> mockConsumer = createConsumerMock(); + queue.enqueueOrForward(new ConnectionEntry(request, mockConsumer, now), now); + + ArgumentCaptor sliceOptions = ArgumentCaptor.forClass(SliceOptions.class); + verify(mockMessageSlicer).slice(sliceOptions.capture()); + assertTrue(sliceOptions.getValue().getMessage() instanceof RequestEnvelope); + + final Request request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref()); + queue.enqueueOrForward(new ConnectionEntry(request2, mockConsumer, now), now); + verifyNoMoreInteractions(mockMessageSlicer); + probe.expectNoMsg(); + + RequestEnvelope requestEnvelope = (RequestEnvelope) sliceOptions.getValue().getMessage(); + queue.complete(new FailureEnvelope(request.toRequestFailure(mock(RequestException.class)), + requestEnvelope.getSessionId(), requestEnvelope.getTxSequence(), 0), 0); + + requestEnvelope = probe.expectMsgClass(RequestEnvelope.class); + assertEquals(request2, requestEnvelope.getMessage()); + + final Request request3 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 3L, probe.ref()); + queue.enqueueOrForward(new ConnectionEntry(request3, mockConsumer, now), now); + + requestEnvelope = probe.expectMsgClass(RequestEnvelope.class); + assertEquals(request3, requestEnvelope.getMessage()); + } + private static void assertEqualRequests(final Collection queue, final Request... requests) { final List> queued = ImmutableList.copyOf(Collections2.transform(queue,