BUG-8403: guard against ConcurrentModificationException
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / TransmitQueue.java
index 28902d28ba2142f59af6dbc055a4933c2eb76b3a..b7543410cd1a63128ac2da7ffde001cc9b3d778f 100644 (file)
@@ -11,9 +11,9 @@ import akka.actor.ActorRef;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
-import com.google.common.collect.Iterables;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.ArrayDeque;
+import java.util.Collection;
 import java.util.Deque;
 import java.util.Iterator;
 import java.util.Optional;
@@ -106,8 +106,20 @@ abstract class TransmitQueue {
         tracker = new AveragingProgressTracker(targetDepth);
     }
 
-    final Iterable<ConnectionEntry> asIterable() {
-        return Iterables.concat(inflight, pending);
+    /**
+     * Drain the contents of the connection into a list. This will leave the queue empty and allow further entries
+     * to be added to it during replay. When we set the successor all entries enqueued between when this methods
+     * returns and the successor is set will be replayed to the successor.
+     *
+     * @return Collection of entries present in the queue.
+     */
+    final Collection<ConnectionEntry> drain() {
+        final Collection<ConnectionEntry> ret = new ArrayDeque<>(inflight.size() + pending.size());
+        ret.addAll(inflight);
+        ret.addAll(pending);
+        inflight.clear();
+        pending.clear();
+        return ret;
     }
 
     final long ticksStalling(final long now) {
@@ -232,13 +244,32 @@ abstract class TransmitQueue {
         poisonQueue(pending, cause);
     }
 
-    final void setForwarder(final ReconnectForwarder forwarder) {
+    final void setForwarder(final ReconnectForwarder forwarder, final long now) {
         Verify.verify(successor == null, "Successor {} already set on connection {}", successor, this);
-        Verify.verify(inflight.isEmpty(), "In-flight requests after replay: %s", inflight);
-        Verify.verify(pending.isEmpty(), "Pending requests after replay: %s", pending);
-
         successor = Preconditions.checkNotNull(forwarder);
-        LOG.debug("Connection {} superseded by {}", this, successor);
+        LOG.debug("Connection {} superseded by {}, splicing queue", this, successor);
+
+        /*
+         * We need to account for entries which have been added between the time drain() was called and this method
+         * is invoked. Since the old connection is visible during replay and some entries may have completed on the
+         * replay thread, there was an avenue for this to happen.
+         */
+        int count = 0;
+        ConnectionEntry entry = inflight.poll();
+        while (entry != null) {
+            successor.forwardEntry(entry, now);
+            entry = inflight.poll();
+            count++;
+        }
+
+        entry = pending.poll();
+        while (entry != null) {
+            successor.forwardEntry(entry, now);
+            entry = pending.poll();
+            count++;
+        }
+
+        LOG.debug("Connection {} queue spliced {} messages", this, count);
     }
 
     final void remove(final long now) {