X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fcds-access-client%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Faccess%2Fclient%2FTransmittingTransmitQueueTest.java;h=358cc9d408e9cbe7dc47d37e0c06244342f5f8d8;hp=4859591384bfd915e91dc9719e2cc0c8e447fb62;hb=503d824302de98ae7d9fd44c6c417ed651865919;hpb=43f18b836755a1167e831687a674d26611f9d3b2 diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java index 4859591384..358cc9d408 100644 --- a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java +++ b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java @@ -8,6 +8,10 @@ package org.opendaylight.controller.cluster.access.client; import static org.hamcrest.CoreMatchers.everyItem; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.mock; @@ -15,18 +19,24 @@ import static org.mockito.Mockito.verify; import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest; import com.google.common.base.Ticker; +import com.google.common.collect.Collections2; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.testing.FakeTicker; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -import org.junit.Assert; import org.junit.Test; import org.opendaylight.controller.cluster.access.ABIVersion; import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest; import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse; +import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope; import org.opendaylight.controller.cluster.access.concepts.Request; import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; +import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RequestSuccess; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope; @@ -51,9 +61,9 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, sequence1, probe.ref()); final TransactionIdentifier transactionIdentifier2 = new TransactionIdentifier(HISTORY, 1L); - final Request request2 = new TransactionPurgeRequest(transactionIdentifier2, sequence2, probe.ref()); + final Request request2 = new TransactionPurgeRequest(transactionIdentifier2, sequence2, probe.ref()); final Consumer> callback1 = createConsumerMock(); final Consumer> callback2 = createConsumerMock(); final long now1 = Ticker.systemTicker().read(); @@ -70,29 +80,29 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); final Consumer> callback = createConsumerMock(); final long now = Ticker.systemTicker().read(); queue.enqueue(new ConnectionEntry(request, callback, now), now); final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class); - Assert.assertEquals(request, requestEnvelope.getMessage()); + assertEquals(request, requestEnvelope.getMessage()); } @Test public void testEnqueueBackendFull() throws Exception { - final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); + final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); final Consumer> callback = createConsumerMock(); final long now = Ticker.systemTicker().read(); final int sentMessages = getMaxInFlightMessages() + 1; @@ -104,34 +114,34 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest entries = queue.asIterable(); - Assert.assertEquals(sentMessages, Iterables.size(entries)); - Assert.assertThat(entries, everyItem(entryWithRequest(request))); + assertEquals(sentMessages, Iterables.size(entries)); + assertThat(entries, everyItem(entryWithRequest(request))); } @Test @Override public void testCanTransmitCount() throws Exception { - Assert.assertTrue(queue.canTransmitCount(getMaxInFlightMessages() - 1) > 0); - Assert.assertFalse(queue.canTransmitCount(getMaxInFlightMessages()) > 0); + assertTrue(queue.canTransmitCount(getMaxInFlightMessages() - 1) > 0); + assertFalse(queue.canTransmitCount(getMaxInFlightMessages()) > 0); } @Test @Override public void testTransmit() throws Exception { - final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); + final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); final Consumer> callback = createConsumerMock(); final long now = Ticker.systemTicker().read(); final ConnectionEntry entry = new ConnectionEntry(request, callback, now); queue.transmit(entry, now); final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class); - Assert.assertEquals(request, requestEnvelope.getMessage()); + assertEquals(request, requestEnvelope.getMessage()); } @Test public void testSetForwarder() throws Exception { final FakeTicker ticker = new FakeTicker(); ticker.setAutoIncrementStep(1, TimeUnit.MICROSECONDS); - final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); + final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); final Consumer> callback = createConsumerMock(); final ConnectionEntry entry = new ConnectionEntry(request, callback, ticker.read()); queue.enqueue(entry, ticker.read()); @@ -144,4 +154,60 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest req0 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); + final Request req1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref()); + final Request req2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 2L, probe.ref()); + final Request req3 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 3L, probe.ref()); + final Request req4 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 4L, probe.ref()); + final Request req5 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 5L, probe.ref()); + final Request req6 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 6L, probe.ref()); + final Consumer> callback = createConsumerMock(); + + // Fill the queue up to capacity + 1 + queue.enqueue(new ConnectionEntry(req0, callback, 0), 0); + queue.enqueue(new ConnectionEntry(req1, callback, 0), 0); + queue.enqueue(new ConnectionEntry(req2, callback, 0), 0); + queue.enqueue(new ConnectionEntry(req3, callback, 0), 0); + assertEqualRequests(queue.getInflight(), req0, req1, req2); + assertEqualRequests(queue.getPending(), req3); + + // Now complete req0, which should transmit req3 + queue.complete(new FailureEnvelope(req0.toRequestFailure(mock(RequestException.class)), 0, 0, 0), 0); + assertEqualRequests(queue.getInflight(), req1, req2, req3); + assertEqualRequests(queue.getPending()); + + // Now complete req1, which should leave an empty slot + queue.complete(new FailureEnvelope(req1.toRequestFailure(mock(RequestException.class)), 0, 1, 0), 0); + assertEqualRequests(queue.getInflight(), req2, req3); + assertEqualRequests(queue.getPending()); + + // Enqueue req4, which should be immediately transmitted + queue.enqueue(new ConnectionEntry(req4, callback, 0), 0); + assertEqualRequests(queue.getInflight(), req2, req3, req4); + assertEqualRequests(queue.getPending()); + + // Enqueue req5, which should move to pending + queue.enqueue(new ConnectionEntry(req5, callback, 0), 0); + assertEqualRequests(queue.getInflight(), req2, req3, req4); + assertEqualRequests(queue.getPending(), req5); + + // Remove req4, creating an inconsistency... + queue.getInflight().removeLast(); + assertEqualRequests(queue.getInflight(), req2, req3); + assertEqualRequests(queue.getPending(), req5); + + // ... and enqueue req6, which should cause req5 to be transmitted + queue.enqueue(new ConnectionEntry(req6, callback, 0), 0); + assertEqualRequests(queue.getInflight(), req2, req3, req5); + assertEqualRequests(queue.getPending(), req6); + } + + private static void assertEqualRequests(final Collection queue, + final Request... requests) { + final List> queued = ImmutableList.copyOf(Collections2.transform(queue, + ConnectionEntry::getRequest)); + assertEquals(Arrays.asList(requests), queued); + } +}