From: Robert Varga Date: Tue, 16 May 2017 14:49:42 +0000 (+0200) Subject: BUG-8402: fix transmit accounting X-Git-Tag: release/nitrogen~217 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=503d824302de98ae7d9fd44c6c417ed651865919 BUG-8402: fix transmit accounting CSIT has shown that during burst activity and leader movement we can lose track of messages and the requests can arrive misordered. As it turns out TransmitQueue.complete() transmit-on-response code path fails to properly move the request to the in-flight queue. Furthermore, opportunistic sending TransmitQueue.enqueue() could cause message reordering if for some reason we have pending requests and available transmit slot. Fix this sharing the codepaths and making the TransmitQueue.enqueue() check pending queue emptiness. Change-Id: I2daf3d8b198e83c6f50f4a2f43b9e4c3cc091187 Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java index 9ab80d0d00..15ad958304 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java @@ -8,11 +8,13 @@ package org.opendaylight.controller.cluster.access.client; import akka.actor.ActorRef; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Verify; import com.google.common.collect.Iterables; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.ArrayDeque; +import java.util.Deque; import java.util.Iterator; import java.util.Optional; import java.util.Queue; @@ -95,8 +97,8 @@ abstract class TransmitQueue { private static final Logger LOG = LoggerFactory.getLogger(TransmitQueue.class); - private final ArrayDeque inflight = new ArrayDeque<>(); - private final ArrayDeque pending = new ArrayDeque<>(); + private final Deque inflight = new ArrayDeque<>(); + private final Deque pending = new ArrayDeque<>(); private final ProgressTracker tracker; private ReconnectForwarder successor; @@ -133,19 +135,34 @@ abstract class TransmitQueue { tracker.closeTask(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos()); // We have freed up a slot, try to transmit something - int toSend = canTransmitCount(inflight.size()); - while (toSend > 0) { + final int toSend = canTransmitCount(inflight.size()); + if (toSend > 0 && !pending.isEmpty()) { + transmitEntries(toSend, now); + } + + return Optional.of(entry); + } + + private void transmitEntries(final int maxTransmit, final long now) { + for (int i = 0; i < maxTransmit; ++i) { final ConnectionEntry e = pending.poll(); if (e == null) { - break; + LOG.debug("Queue {} transmitted {} requests", this, i); + return; } - LOG.debug("Transmitting entry {}", e); - transmit(e, now); - toSend--; + transmitEntry(e, now); } - return Optional.of(entry); + LOG.debug("Queue {} transmitted {} requests", this, maxTransmit); + } + + private void transmitEntry(final ConnectionEntry entry, final long now) { + LOG.debug("Queue {} transmitting entry {}", entry); + // We are not thread-safe and are supposed to be externally-guarded, + // hence send-before-record should be fine. + // This needs to be revisited if the external guards are lowered. + inflight.addLast(transmit(entry, now)); } /** @@ -164,16 +181,25 @@ abstract class TransmitQueue { // Reserve an entry before we do anything that can fail final long delay = tracker.openTask(now); - if (canTransmitCount(inflight.size()) <= 0) { + + /* + * This is defensive to make sure we do not do the wrong thing here and reorder messages if we ever happen + * to have available send slots and non-empty pending queue. + */ + final int toSend = canTransmitCount(inflight.size()); + if (toSend <= 0) { LOG.trace("Queue is at capacity, delayed sending of request {}", entry.getRequest()); - pending.add(entry); - } else { - // We are not thread-safe and are supposed to be externally-guarded, - // hence send-before-record should be fine. - // This needs to be revisited if the external guards are lowered. - inflight.offer(transmit(entry, now)); - LOG.debug("Sent request {} on queue {}", entry.getRequest(), this); + pending.addLast(entry); + return delay; + } + + if (pending.isEmpty()) { + transmitEntry(entry, now); + return delay; } + + pending.addLast(entry); + transmitEntries(toSend, now); return delay; } @@ -220,6 +246,16 @@ abstract class TransmitQueue { } } + @VisibleForTesting + Deque getInflight() { + return inflight; + } + + @VisibleForTesting + Deque getPending() { + return pending; + } + /* * We are using tri-state return here to indicate one of three conditions: * - if a matching entry is found, return an Optional containing it diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java index 4859591384..358cc9d408 100644 --- a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java +++ b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java @@ -8,6 +8,10 @@ package org.opendaylight.controller.cluster.access.client; import static org.hamcrest.CoreMatchers.everyItem; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; import static org.mockito.Mockito.mock; @@ -15,18 +19,24 @@ import static org.mockito.Mockito.verify; import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest; import com.google.common.base.Ticker; +import com.google.common.collect.Collections2; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.testing.FakeTicker; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -import org.junit.Assert; import org.junit.Test; import org.opendaylight.controller.cluster.access.ABIVersion; import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest; import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse; +import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope; import org.opendaylight.controller.cluster.access.concepts.Request; import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; +import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RequestSuccess; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope; @@ -51,9 +61,9 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, sequence1, probe.ref()); final TransactionIdentifier transactionIdentifier2 = new TransactionIdentifier(HISTORY, 1L); - final Request request2 = new TransactionPurgeRequest(transactionIdentifier2, sequence2, probe.ref()); + final Request request2 = new TransactionPurgeRequest(transactionIdentifier2, sequence2, probe.ref()); final Consumer> callback1 = createConsumerMock(); final Consumer> callback2 = createConsumerMock(); final long now1 = Ticker.systemTicker().read(); @@ -70,29 +80,29 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); final Consumer> callback = createConsumerMock(); final long now = Ticker.systemTicker().read(); queue.enqueue(new ConnectionEntry(request, callback, now), now); final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class); - Assert.assertEquals(request, requestEnvelope.getMessage()); + assertEquals(request, requestEnvelope.getMessage()); } @Test public void testEnqueueBackendFull() throws Exception { - final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); + final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); final Consumer> callback = createConsumerMock(); final long now = Ticker.systemTicker().read(); final int sentMessages = getMaxInFlightMessages() + 1; @@ -104,34 +114,34 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest entries = queue.asIterable(); - Assert.assertEquals(sentMessages, Iterables.size(entries)); - Assert.assertThat(entries, everyItem(entryWithRequest(request))); + assertEquals(sentMessages, Iterables.size(entries)); + assertThat(entries, everyItem(entryWithRequest(request))); } @Test @Override public void testCanTransmitCount() throws Exception { - Assert.assertTrue(queue.canTransmitCount(getMaxInFlightMessages() - 1) > 0); - Assert.assertFalse(queue.canTransmitCount(getMaxInFlightMessages()) > 0); + assertTrue(queue.canTransmitCount(getMaxInFlightMessages() - 1) > 0); + assertFalse(queue.canTransmitCount(getMaxInFlightMessages()) > 0); } @Test @Override public void testTransmit() throws Exception { - final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); + final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); final Consumer> callback = createConsumerMock(); final long now = Ticker.systemTicker().read(); final ConnectionEntry entry = new ConnectionEntry(request, callback, now); queue.transmit(entry, now); final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class); - Assert.assertEquals(request, requestEnvelope.getMessage()); + assertEquals(request, requestEnvelope.getMessage()); } @Test public void testSetForwarder() throws Exception { final FakeTicker ticker = new FakeTicker(); ticker.setAutoIncrementStep(1, TimeUnit.MICROSECONDS); - final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); + final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); final Consumer> callback = createConsumerMock(); final ConnectionEntry entry = new ConnectionEntry(request, callback, ticker.read()); queue.enqueue(entry, ticker.read()); @@ -144,4 +154,60 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest req0 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref()); + final Request req1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref()); + final Request req2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 2L, probe.ref()); + final Request req3 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 3L, probe.ref()); + final Request req4 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 4L, probe.ref()); + final Request req5 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 5L, probe.ref()); + final Request req6 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 6L, probe.ref()); + final Consumer> callback = createConsumerMock(); + + // Fill the queue up to capacity + 1 + queue.enqueue(new ConnectionEntry(req0, callback, 0), 0); + queue.enqueue(new ConnectionEntry(req1, callback, 0), 0); + queue.enqueue(new ConnectionEntry(req2, callback, 0), 0); + queue.enqueue(new ConnectionEntry(req3, callback, 0), 0); + assertEqualRequests(queue.getInflight(), req0, req1, req2); + assertEqualRequests(queue.getPending(), req3); + + // Now complete req0, which should transmit req3 + queue.complete(new FailureEnvelope(req0.toRequestFailure(mock(RequestException.class)), 0, 0, 0), 0); + assertEqualRequests(queue.getInflight(), req1, req2, req3); + assertEqualRequests(queue.getPending()); + + // Now complete req1, which should leave an empty slot + queue.complete(new FailureEnvelope(req1.toRequestFailure(mock(RequestException.class)), 0, 1, 0), 0); + assertEqualRequests(queue.getInflight(), req2, req3); + assertEqualRequests(queue.getPending()); + + // Enqueue req4, which should be immediately transmitted + queue.enqueue(new ConnectionEntry(req4, callback, 0), 0); + assertEqualRequests(queue.getInflight(), req2, req3, req4); + assertEqualRequests(queue.getPending()); + + // Enqueue req5, which should move to pending + queue.enqueue(new ConnectionEntry(req5, callback, 0), 0); + assertEqualRequests(queue.getInflight(), req2, req3, req4); + assertEqualRequests(queue.getPending(), req5); + + // Remove req4, creating an inconsistency... + queue.getInflight().removeLast(); + assertEqualRequests(queue.getInflight(), req2, req3); + assertEqualRequests(queue.getPending(), req5); + + // ... and enqueue req6, which should cause req5 to be transmitted + queue.enqueue(new ConnectionEntry(req6, callback, 0), 0); + assertEqualRequests(queue.getInflight(), req2, req3, req5); + assertEqualRequests(queue.getPending(), req6); + } + + private static void assertEqualRequests(final Collection queue, + final Request... requests) { + final List> queued = ImmutableList.copyOf(Collections2.transform(queue, + ConnectionEntry::getRequest)); + assertEquals(Arrays.asList(requests), queued); + } +}