BUG-8402: fix transmit accounting 16/57216/2
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 16 May 2017 14:49:42 +0000 (16:49 +0200)
committerTom Pantelis <tompantelis@gmail.com>
Wed, 17 May 2017 01:19:20 +0000 (01:19 +0000)
CSIT has shown that during burst activity and leader movement
we can lose track of messages and the requests can arrive misordered.

As it turns out TransmitQueue.complete() transmit-on-response code
path fails to properly move the request to the in-flight queue.

Furthermore, opportunistic sending TransmitQueue.enqueue() could cause
message reordering if for some reason we have pending requests and
available transmit slot.

Fix this sharing the codepaths and making the TransmitQueue.enqueue()
check pending queue emptiness.

Change-Id: I2daf3d8b198e83c6f50f4a2f43b9e4c3cc091187
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/TransmittingTransmitQueueTest.java

index 9ab80d0..15ad958 100644 (file)
@@ -8,11 +8,13 @@
 package org.opendaylight.controller.cluster.access.client;
 
 import akka.actor.ActorRef;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
 import com.google.common.collect.Iterables;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.ArrayDeque;
+import java.util.Deque;
 import java.util.Iterator;
 import java.util.Optional;
 import java.util.Queue;
@@ -95,8 +97,8 @@ abstract class TransmitQueue {
 
     private static final Logger LOG = LoggerFactory.getLogger(TransmitQueue.class);
 
-    private final ArrayDeque<TransmittedConnectionEntry> inflight = new ArrayDeque<>();
-    private final ArrayDeque<ConnectionEntry> pending = new ArrayDeque<>();
+    private final Deque<TransmittedConnectionEntry> inflight = new ArrayDeque<>();
+    private final Deque<ConnectionEntry> pending = new ArrayDeque<>();
     private final ProgressTracker tracker;
     private ReconnectForwarder successor;
 
@@ -133,19 +135,34 @@ abstract class TransmitQueue {
         tracker.closeTask(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
 
         // We have freed up a slot, try to transmit something
-        int toSend = canTransmitCount(inflight.size());
-        while (toSend > 0) {
+        final int toSend = canTransmitCount(inflight.size());
+        if (toSend > 0 && !pending.isEmpty()) {
+            transmitEntries(toSend, now);
+        }
+
+        return Optional.of(entry);
+    }
+
+    private void transmitEntries(final int maxTransmit, final long now) {
+        for (int i = 0; i < maxTransmit; ++i) {
             final ConnectionEntry e = pending.poll();
             if (e == null) {
-                break;
+                LOG.debug("Queue {} transmitted {} requests", this, i);
+                return;
             }
 
-            LOG.debug("Transmitting entry {}", e);
-            transmit(e, now);
-            toSend--;
+            transmitEntry(e, now);
         }
 
-        return Optional.of(entry);
+        LOG.debug("Queue {} transmitted {} requests", this, maxTransmit);
+    }
+
+    private void transmitEntry(final ConnectionEntry entry, final long now) {
+        LOG.debug("Queue {} transmitting entry {}", entry);
+        // We are not thread-safe and are supposed to be externally-guarded,
+        // hence send-before-record should be fine.
+        // This needs to be revisited if the external guards are lowered.
+        inflight.addLast(transmit(entry, now));
     }
 
     /**
@@ -164,16 +181,25 @@ abstract class TransmitQueue {
 
         // Reserve an entry before we do anything that can fail
         final long delay = tracker.openTask(now);
-        if (canTransmitCount(inflight.size()) <= 0) {
+
+        /*
+         * This is defensive to make sure we do not do the wrong thing here and reorder messages if we ever happen
+         * to have available send slots and non-empty pending queue.
+         */
+        final int toSend = canTransmitCount(inflight.size());
+        if (toSend <= 0) {
             LOG.trace("Queue is at capacity, delayed sending of request {}", entry.getRequest());
-            pending.add(entry);
-        } else {
-            // We are not thread-safe and are supposed to be externally-guarded,
-            // hence send-before-record should be fine.
-            // This needs to be revisited if the external guards are lowered.
-            inflight.offer(transmit(entry, now));
-            LOG.debug("Sent request {} on queue {}", entry.getRequest(), this);
+            pending.addLast(entry);
+            return delay;
+        }
+
+        if (pending.isEmpty()) {
+            transmitEntry(entry, now);
+            return delay;
         }
+
+        pending.addLast(entry);
+        transmitEntries(toSend, now);
         return delay;
     }
 
@@ -220,6 +246,16 @@ abstract class TransmitQueue {
         }
     }
 
+    @VisibleForTesting
+    Deque<TransmittedConnectionEntry> getInflight() {
+        return inflight;
+    }
+
+    @VisibleForTesting
+    Deque<ConnectionEntry> getPending() {
+        return pending;
+    }
+
     /*
      * We are using tri-state return here to indicate one of three conditions:
      * - if a matching entry is found, return an Optional containing it
index 4859591..358cc9d 100644 (file)
@@ -8,6 +8,10 @@
 package org.opendaylight.controller.cluster.access.client;
 
 import static org.hamcrest.CoreMatchers.everyItem;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.mock;
@@ -15,18 +19,24 @@ import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest;
 
 import com.google.common.base.Ticker;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.testing.FakeTicker;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
-import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.access.ABIVersion;
 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
+import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
@@ -51,9 +61,9 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
     public void testComplete() throws Exception {
         final long sequence1 = 0L;
         final long sequence2 = 1L;
-        final Request request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, sequence1, probe.ref());
+        final Request<?, ?> request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, sequence1, probe.ref());
         final TransactionIdentifier transactionIdentifier2 = new TransactionIdentifier(HISTORY, 1L);
-        final Request request2 = new TransactionPurgeRequest(transactionIdentifier2, sequence2, probe.ref());
+        final Request<?, ?> request2 = new TransactionPurgeRequest(transactionIdentifier2, sequence2, probe.ref());
         final Consumer<Response<?, ?>> callback1 = createConsumerMock();
         final Consumer<Response<?, ?>> callback2 = createConsumerMock();
         final long now1 = Ticker.systemTicker().read();
@@ -70,29 +80,29 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
                 queue.complete(new SuccessEnvelope(success1, 0L, sequence1, 1L), now1);
         //check first entry
         final TransmittedConnectionEntry transmittedEntry1 = completed1.orElseThrow(AssertionError::new);
-        Assert.assertEquals(transmittedEntry1.getRequest(), request1);
-        Assert.assertEquals(transmittedEntry1.getTxSequence(), sequence1);
-        Assert.assertEquals(transmittedEntry1.getCallback(), callback1);
+        assertEquals(transmittedEntry1.getRequest(), request1);
+        assertEquals(transmittedEntry1.getTxSequence(), sequence1);
+        assertEquals(transmittedEntry1.getCallback(), callback1);
         //check second entry
         final TransmittedConnectionEntry transmittedEntry2 = completed2.orElseThrow(AssertionError::new);
-        Assert.assertEquals(transmittedEntry2.getRequest(), request2);
-        Assert.assertEquals(transmittedEntry2.getTxSequence(), sequence2);
-        Assert.assertEquals(transmittedEntry2.getCallback(), callback2);
+        assertEquals(transmittedEntry2.getRequest(), request2);
+        assertEquals(transmittedEntry2.getTxSequence(), sequence2);
+        assertEquals(transmittedEntry2.getCallback(), callback2);
     }
 
     @Test
     public void testEnqueueCanTransmit() throws Exception {
-        final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+        final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
         queue.enqueue(new ConnectionEntry(request, callback, now), now);
         final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
-        Assert.assertEquals(request, requestEnvelope.getMessage());
+        assertEquals(request, requestEnvelope.getMessage());
     }
 
     @Test
     public void testEnqueueBackendFull() throws Exception {
-        final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+        final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
         final int sentMessages = getMaxInFlightMessages() + 1;
@@ -104,34 +114,34 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
         }
         probe.expectNoMsg();
         final Iterable<ConnectionEntry> entries = queue.asIterable();
-        Assert.assertEquals(sentMessages, Iterables.size(entries));
-        Assert.assertThat(entries, everyItem(entryWithRequest(request)));
+        assertEquals(sentMessages, Iterables.size(entries));
+        assertThat(entries, everyItem(entryWithRequest(request)));
     }
 
     @Test
     @Override
     public void testCanTransmitCount() throws Exception {
-        Assert.assertTrue(queue.canTransmitCount(getMaxInFlightMessages() - 1) > 0);
-        Assert.assertFalse(queue.canTransmitCount(getMaxInFlightMessages()) > 0);
+        assertTrue(queue.canTransmitCount(getMaxInFlightMessages() - 1) > 0);
+        assertFalse(queue.canTransmitCount(getMaxInFlightMessages()) > 0);
     }
 
     @Test
     @Override
     public void testTransmit() throws Exception {
-        final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+        final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final long now = Ticker.systemTicker().read();
         final ConnectionEntry entry = new ConnectionEntry(request, callback, now);
         queue.transmit(entry, now);
         final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
-        Assert.assertEquals(request, requestEnvelope.getMessage());
+        assertEquals(request, requestEnvelope.getMessage());
     }
 
     @Test
     public void testSetForwarder() throws Exception {
         final FakeTicker ticker = new FakeTicker();
         ticker.setAutoIncrementStep(1, TimeUnit.MICROSECONDS);
-        final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+        final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
         final Consumer<Response<?, ?>> callback = createConsumerMock();
         final ConnectionEntry entry = new ConnectionEntry(request, callback, ticker.read());
         queue.enqueue(entry, ticker.read());
@@ -144,4 +154,60 @@ public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<Tra
         verify(forwarder).forwardEntry(entry, secondEnqueueNow);
     }
 
-}
\ No newline at end of file
+    @Test
+    public void testCompleteOrdering() {
+        final Request<?, ?> req0 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+        final Request<?, ?> req1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
+        final Request<?, ?> req2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 2L, probe.ref());
+        final Request<?, ?> req3 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 3L, probe.ref());
+        final Request<?, ?> req4 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 4L, probe.ref());
+        final Request<?, ?> req5 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 5L, probe.ref());
+        final Request<?, ?> req6 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 6L, probe.ref());
+        final Consumer<Response<?, ?>> callback = createConsumerMock();
+
+        // Fill the queue up to capacity + 1
+        queue.enqueue(new ConnectionEntry(req0, callback, 0), 0);
+        queue.enqueue(new ConnectionEntry(req1, callback, 0), 0);
+        queue.enqueue(new ConnectionEntry(req2, callback, 0), 0);
+        queue.enqueue(new ConnectionEntry(req3, callback, 0), 0);
+        assertEqualRequests(queue.getInflight(), req0, req1, req2);
+        assertEqualRequests(queue.getPending(), req3);
+
+        // Now complete req0, which should transmit req3
+        queue.complete(new FailureEnvelope(req0.toRequestFailure(mock(RequestException.class)), 0, 0, 0), 0);
+        assertEqualRequests(queue.getInflight(), req1, req2, req3);
+        assertEqualRequests(queue.getPending());
+
+        // Now complete req1, which should leave an empty slot
+        queue.complete(new FailureEnvelope(req1.toRequestFailure(mock(RequestException.class)), 0, 1, 0), 0);
+        assertEqualRequests(queue.getInflight(), req2, req3);
+        assertEqualRequests(queue.getPending());
+
+        // Enqueue req4, which should be immediately transmitted
+        queue.enqueue(new ConnectionEntry(req4, callback, 0), 0);
+        assertEqualRequests(queue.getInflight(), req2, req3, req4);
+        assertEqualRequests(queue.getPending());
+
+        // Enqueue req5, which should move to pending
+        queue.enqueue(new ConnectionEntry(req5, callback, 0), 0);
+        assertEqualRequests(queue.getInflight(), req2, req3, req4);
+        assertEqualRequests(queue.getPending(), req5);
+
+        // Remove req4, creating an inconsistency...
+        queue.getInflight().removeLast();
+        assertEqualRequests(queue.getInflight(), req2, req3);
+        assertEqualRequests(queue.getPending(), req5);
+
+        // ... and enqueue req6, which should cause req5 to be transmitted
+        queue.enqueue(new ConnectionEntry(req6, callback, 0), 0);
+        assertEqualRequests(queue.getInflight(), req2, req3, req5);
+        assertEqualRequests(queue.getPending(), req6);
+    }
+
+    private static void assertEqualRequests(final Collection<? extends ConnectionEntry> queue,
+            final Request<?, ?>... requests) {
+        final List<Request<?, ?>> queued = ImmutableList.copyOf(Collections2.transform(queue,
+            ConnectionEntry::getRequest));
+        assertEquals(Arrays.asList(requests), queued);
+    }
+}

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.