X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fcds-access-client%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Faccess%2Fclient%2FTransmittingTransmitQueueTest.java;h=8db4803c43c9bec7b43c94b6ee7ec100d163a41e;hb=refs%2Fchanges%2F39%2F74339%2F5;hp=752c12771a67bb755c2063b2a16a6b10c2f5a48f;hpb=28551609a31799a43d3017ba0681e198f5136d70;p=controller.git diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java index 752c12771a..8db4803c43 100644 --- a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java +++ b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java @@ -12,14 +12,17 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest; import com.google.common.base.Ticker; import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; import com.google.common.testing.FakeTicker; import java.util.Arrays; import java.util.Collection; @@ -28,7 +31,9 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import org.junit.Test; +import org.mockito.ArgumentCaptor; import org.opendaylight.controller.cluster.access.ABIVersion; +import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder; import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest; import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse; import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope; @@ -39,10 +44,17 @@ import org.opendaylight.controller.cluster.access.concepts.RequestSuccess; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.messaging.MessageSlicer; +import org.opendaylight.controller.cluster.messaging.SliceOptions; public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest { private BackendInfo backendInfo; + private final MessageSlicer mockMessageSlicer = mock(MessageSlicer.class); + + private static long now() { + return Ticker.systemTicker().read(); + } @Override protected int getMaxInFlightMessages() { @@ -51,12 +63,13 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, sequence1, probe.ref()); @@ -64,11 +77,11 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest request2 = new TransactionPurgeRequest(transactionIdentifier2, sequence2, probe.ref()); final Consumer> callback1 = createConsumerMock(); final Consumer> callback2 = createConsumerMock(); - final long now1 = Ticker.systemTicker().read(); - final long now2 = Ticker.systemTicker().read(); + final long now1 = now(); + final long now2 = now(); //enqueue 2 entries - queue.enqueue(new ConnectionEntry(request1, callback1, now1), now1); - queue.enqueue(new ConnectionEntry(request2, callback2, now2), now2); + queue.enqueueOrForward(new ConnectionEntry(request1, callback1, now1), now1); + queue.enqueueOrForward(new ConnectionEntry(request2, callback2, now2), now2); final RequestSuccess success1 = new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, sequence1); final RequestSuccess success2 = new TransactionPurgeResponse(transactionIdentifier2, sequence2); //complete entries in different order @@ -89,63 +102,72 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); final Consumer> callback = createConsumerMock(); - final long now = Ticker.systemTicker().read(); - queue.enqueue(new ConnectionEntry(request, callback, now), now); + final long now = now(); + queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now); final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class); assertEquals(request, requestEnvelope.getMessage()); } @Test - public void testEnqueueBackendFull() throws Exception { + public void testEnqueueBackendFull() { final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); final Consumer> callback = createConsumerMock(); - final long now = Ticker.systemTicker().read(); + final long now = now(); final int sentMessages = getMaxInFlightMessages() + 1; for (int i = 0; i < sentMessages; i++) { - queue.enqueue(new ConnectionEntry(request, callback, now), now); + queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now); } for (int i = 0; i < getMaxInFlightMessages(); i++) { probe.expectMsgClass(RequestEnvelope.class); } probe.expectNoMsg(); - final Iterable entries = queue.asIterable(); - assertEquals(sentMessages, Iterables.size(entries)); + final Collection entries = queue.drain(); + assertEquals(sentMessages, entries.size()); assertThat(entries, everyItem(entryWithRequest(request))); } @Test @Override - public void testCanTransmitCount() throws Exception { + public void testCanTransmitCount() { assertTrue(queue.canTransmitCount(getMaxInFlightMessages() - 1) > 0); assertFalse(queue.canTransmitCount(getMaxInFlightMessages()) > 0); } @Test @Override - public void testTransmit() throws Exception { + public void testTransmit() { final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); final Consumer> callback = createConsumerMock(); - final long now = Ticker.systemTicker().read(); + final long now = now(); final ConnectionEntry entry = new ConnectionEntry(request, callback, now); - queue.transmit(entry, now); + + Optional transmitted = queue.transmit(entry, now); + assertTrue(transmitted.isPresent()); + assertEquals(request, transmitted.get().getRequest()); + assertEquals(callback, transmitted.get().getCallback()); + final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class); assertEquals(request, requestEnvelope.getMessage()); + + transmitted = queue.transmit(new ConnectionEntry(new TransactionPurgeRequest( + TRANSACTION_IDENTIFIER, 1L, probe.ref()), callback, now), now); + assertTrue(transmitted.isPresent()); } @Test - public void testSetForwarder() throws Exception { + public void testSetForwarder() { final FakeTicker ticker = new FakeTicker(); ticker.setAutoIncrementStep(1, TimeUnit.MICROSECONDS); final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); final Consumer> callback = createConsumerMock(); final ConnectionEntry entry = new ConnectionEntry(request, callback, ticker.read()); final ReconnectForwarder forwarder = mock(ReconnectForwarder.class); - queue.setForwarder(forwarder); + queue.setForwarder(forwarder, ticker.read()); final long secondEnqueueNow = ticker.read(); - queue.enqueue(entry, secondEnqueueNow); + queue.enqueueOrForward(entry, secondEnqueueNow); verify(forwarder).forwardEntry(entry, secondEnqueueNow); } @@ -161,10 +183,10 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest> callback = createConsumerMock(); // Fill the queue up to capacity + 1 - queue.enqueue(new ConnectionEntry(req0, callback, 0), 0); - queue.enqueue(new ConnectionEntry(req1, callback, 0), 0); - queue.enqueue(new ConnectionEntry(req2, callback, 0), 0); - queue.enqueue(new ConnectionEntry(req3, callback, 0), 0); + queue.enqueueOrForward(new ConnectionEntry(req0, callback, 0), 0); + queue.enqueueOrForward(new ConnectionEntry(req1, callback, 0), 0); + queue.enqueueOrForward(new ConnectionEntry(req2, callback, 0), 0); + queue.enqueueOrForward(new ConnectionEntry(req3, callback, 0), 0); assertEqualRequests(queue.getInflight(), req0, req1, req2); assertEqualRequests(queue.getPending(), req3); @@ -179,12 +201,12 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest request = reqBuilder.build(); + + final long now = now(); + final Consumer> mockConsumer = createConsumerMock(); + Optional transmitted = + queue.transmit(new ConnectionEntry(request, mockConsumer, now), now); + assertTrue(transmitted.isPresent()); + + ArgumentCaptor sliceOptions = ArgumentCaptor.forClass(SliceOptions.class); + verify(mockMessageSlicer).slice(sliceOptions.capture()); + assertTrue(sliceOptions.getValue().getMessage() instanceof RequestEnvelope); + RequestEnvelope requestEnvelope = (RequestEnvelope) sliceOptions.getValue().getMessage(); + assertEquals(request, requestEnvelope.getMessage()); + + final Request request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref()); + transmitted = queue.transmit(new ConnectionEntry(request2, mockConsumer, now), now); + assertFalse(transmitted.isPresent()); + } + + @Test + public void testSlicingFailureOnTransmit() { + doAnswer(invocation -> { + invocation.getArgumentAt(0, SliceOptions.class).getOnFailureCallback().accept(new Exception("mock")); + return Boolean.FALSE; + }).when(mockMessageSlicer).slice(any()); + + ModifyTransactionRequestBuilder reqBuilder = new ModifyTransactionRequestBuilder( + TRANSACTION_IDENTIFIER, probe.ref()); + reqBuilder.setSequence(0L); + + final long now = now(); + Optional transmitted = + queue.transmit(new ConnectionEntry(reqBuilder.build(), createConsumerMock(), now), now); + assertTrue(transmitted.isPresent()); + + verify(mockMessageSlicer).slice(any()); + + probe.expectMsgClass(FailureEnvelope.class); + } + + @Test + public void testSlicedRequestOnComplete() { + doReturn(true).when(mockMessageSlicer).slice(any()); + + ModifyTransactionRequestBuilder reqBuilder = new ModifyTransactionRequestBuilder( + TRANSACTION_IDENTIFIER, probe.ref()); + reqBuilder.setSequence(0L); + final Request request = reqBuilder.build(); + + final long now = now(); + final Consumer> mockConsumer = createConsumerMock(); + queue.enqueueOrForward(new ConnectionEntry(request, mockConsumer, now), now); + + ArgumentCaptor sliceOptions = ArgumentCaptor.forClass(SliceOptions.class); + verify(mockMessageSlicer).slice(sliceOptions.capture()); + assertTrue(sliceOptions.getValue().getMessage() instanceof RequestEnvelope); + + final Request request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref()); + queue.enqueueOrForward(new ConnectionEntry(request2, mockConsumer, now), now); + verifyNoMoreInteractions(mockMessageSlicer); + probe.expectNoMsg(); + + RequestEnvelope requestEnvelope = (RequestEnvelope) sliceOptions.getValue().getMessage(); + queue.complete(new FailureEnvelope(request.toRequestFailure(mock(RequestException.class)), + requestEnvelope.getSessionId(), requestEnvelope.getTxSequence(), 0), 0); + + requestEnvelope = probe.expectMsgClass(RequestEnvelope.class); + assertEquals(request2, requestEnvelope.getMessage()); + + final Request request3 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 3L, probe.ref()); + queue.enqueueOrForward(new ConnectionEntry(request3, mockConsumer, now), now); + + requestEnvelope = probe.expectMsgClass(RequestEnvelope.class); + assertEquals(request3, requestEnvelope.getMessage()); + } + private static void assertEqualRequests(final Collection queue, final Request... requests) { final List> queued = ImmutableList.copyOf(Collections2.transform(queue,