BUG-8403: guard against ConcurrentModificationException
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / TransmitQueue.java
index d384ba47369ce5dc034028d12cad11bb54f8e3ae..b7543410cd1a63128ac2da7ffde001cc9b3d778f 100644 (file)
@@ -8,11 +8,13 @@
 package org.opendaylight.controller.cluster.access.client;
 
 import akka.actor.ActorRef;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
-import com.google.common.collect.Iterables;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
 import java.util.Iterator;
 import java.util.Optional;
 import java.util.Queue;
@@ -95,8 +97,8 @@ abstract class TransmitQueue {
 
     private static final Logger LOG = LoggerFactory.getLogger(TransmitQueue.class);
 
-    private final ArrayDeque<TransmittedConnectionEntry> inflight = new ArrayDeque<>();
-    private final ArrayDeque<ConnectionEntry> pending = new ArrayDeque<>();
+    private final Deque<TransmittedConnectionEntry> inflight = new ArrayDeque<>();
+    private final Deque<ConnectionEntry> pending = new ArrayDeque<>();
     private final ProgressTracker tracker;
     private ReconnectForwarder successor;
 
@@ -104,8 +106,20 @@ abstract class TransmitQueue {
         tracker = new AveragingProgressTracker(targetDepth);
     }
 
-    final Iterable<ConnectionEntry> asIterable() {
-        return Iterables.concat(inflight, pending);
+    /**
+     * Drain the contents of the connection into a list. This will leave the queue empty and allow further entries
+     * to be added to it during replay. When we set the successor all entries enqueued between when this methods
+     * returns and the successor is set will be replayed to the successor.
+     *
+     * @return Collection of entries present in the queue.
+     */
+    final Collection<ConnectionEntry> drain() {
+        final Collection<ConnectionEntry> ret = new ArrayDeque<>(inflight.size() + pending.size());
+        ret.addAll(inflight);
+        ret.addAll(pending);
+        inflight.clear();
+        pending.clear();
+        return ret;
     }
 
     final long ticksStalling(final long now) {
@@ -133,19 +147,38 @@ abstract class TransmitQueue {
         tracker.closeTask(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
 
         // We have freed up a slot, try to transmit something
-        int toSend = canTransmitCount(inflight.size());
-        while (toSend > 0) {
+        tryTransmit(now);
+
+        return Optional.of(entry);
+    }
+
+    final void tryTransmit(final long now) {
+        final int toSend = canTransmitCount(inflight.size());
+        if (toSend > 0 && !pending.isEmpty()) {
+            transmitEntries(toSend, now);
+        }
+    }
+
+    private void transmitEntries(final int maxTransmit, final long now) {
+        for (int i = 0; i < maxTransmit; ++i) {
             final ConnectionEntry e = pending.poll();
             if (e == null) {
-                break;
+                LOG.debug("Queue {} transmitted {} requests", this, i);
+                return;
             }
 
-            LOG.debug("Transmitting entry {}", e);
-            transmit(e, now);
-            toSend--;
+            transmitEntry(e, now);
         }
 
-        return Optional.of(entry);
+        LOG.debug("Queue {} transmitted {} requests", this, maxTransmit);
+    }
+
+    private void transmitEntry(final ConnectionEntry entry, final long now) {
+        LOG.debug("Queue {} transmitting entry {}", entry);
+        // We are not thread-safe and are supposed to be externally-guarded,
+        // hence send-before-record should be fine.
+        // This needs to be revisited if the external guards are lowered.
+        inflight.addLast(transmit(entry, now));
     }
 
     /**
@@ -159,18 +192,30 @@ abstract class TransmitQueue {
             return 0;
         }
 
+        // XXX: we should place a guard against incorrect entry sequences:
+        // entry.getEnqueueTicks() should have non-negative difference from the last entry present in the queues
+
         // Reserve an entry before we do anything that can fail
         final long delay = tracker.openTask(now);
-        if (canTransmitCount(inflight.size()) <= 0) {
+
+        /*
+         * This is defensive to make sure we do not do the wrong thing here and reorder messages if we ever happen
+         * to have available send slots and non-empty pending queue.
+         */
+        final int toSend = canTransmitCount(inflight.size());
+        if (toSend <= 0) {
             LOG.trace("Queue is at capacity, delayed sending of request {}", entry.getRequest());
-            pending.add(entry);
-        } else {
-            // We are not thread-safe and are supposed to be externally-guarded,
-            // hence send-before-record should be fine.
-            // This needs to be revisited if the external guards are lowered.
-            inflight.offer(transmit(entry, now));
-            LOG.debug("Sent request {} on queue {}", entry.getRequest(), this);
+            pending.addLast(entry);
+            return delay;
+        }
+
+        if (pending.isEmpty()) {
+            transmitEntry(entry, now);
+            return delay;
         }
+
+        pending.addLast(entry);
+        transmitEntries(toSend, now);
         return delay;
     }
 
@@ -204,19 +249,49 @@ abstract class TransmitQueue {
         successor = Preconditions.checkNotNull(forwarder);
         LOG.debug("Connection {} superseded by {}, splicing queue", this, successor);
 
+        /*
+         * We need to account for entries which have been added between the time drain() was called and this method
+         * is invoked. Since the old connection is visible during replay and some entries may have completed on the
+         * replay thread, there was an avenue for this to happen.
+         */
+        int count = 0;
         ConnectionEntry entry = inflight.poll();
         while (entry != null) {
             successor.forwardEntry(entry, now);
             entry = inflight.poll();
+            count++;
         }
 
         entry = pending.poll();
         while (entry != null) {
             successor.forwardEntry(entry, now);
             entry = pending.poll();
+            count++;
+        }
+
+        LOG.debug("Connection {} queue spliced {} messages", this, count);
+    }
+
+    final void remove(final long now) {
+        final TransmittedConnectionEntry txe = inflight.poll();
+        if (txe == null) {
+            final ConnectionEntry entry = pending.pop();
+            tracker.closeTask(now, entry.getEnqueuedTicks(), 0, 0);
+        } else {
+            tracker.closeTask(now, txe.getEnqueuedTicks(), txe.getTxTicks(), 0);
         }
     }
 
+    @VisibleForTesting
+    Deque<TransmittedConnectionEntry> getInflight() {
+        return inflight;
+    }
+
+    @VisibleForTesting
+    Deque<ConnectionEntry> getPending() {
+        return pending;
+    }
+
     /*
      * We are using tri-state return here to indicate one of three conditions:
      * - if a matching entry is found, return an Optional containing it
@@ -279,5 +354,4 @@ abstract class TransmitQueue {
         }
         queue.clear();
     }
-
 }