import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
-import com.google.common.collect.Iterables;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayDeque;
+import java.util.Collection;
import java.util.Deque;
import java.util.Iterator;
import java.util.Optional;
tracker = new AveragingProgressTracker(targetDepth);
}
- final Iterable<ConnectionEntry> asIterable() {
- return Iterables.concat(inflight, pending);
+ /**
+ * Drain the contents of the connection into a list. This will leave the queue empty and allow further entries
+ * to be added to it during replay. When we set the successor all entries enqueued between when this methods
+ * returns and the successor is set will be replayed to the successor.
+ *
+ * @return Collection of entries present in the queue.
+ */
+ final Collection<ConnectionEntry> drain() {
+ final Collection<ConnectionEntry> ret = new ArrayDeque<>(inflight.size() + pending.size());
+ ret.addAll(inflight);
+ ret.addAll(pending);
+ inflight.clear();
+ pending.clear();
+ return ret;
}
final long ticksStalling(final long now) {
}
private void transmitEntry(final ConnectionEntry entry, final long now) {
- LOG.debug("Queue {} transmitting entry {}", entry);
+ LOG.debug("Queue {} transmitting entry {}", this, entry);
// We are not thread-safe and are supposed to be externally-guarded,
// hence send-before-record should be fine.
// This needs to be revisited if the external guards are lowered.
*/
final long enqueue(final ConnectionEntry entry, final long now) {
if (successor != null) {
+ // This call will pay the enqueuing price, hence the caller does not have to
successor.forwardEntry(entry, now);
return 0;
}
poisonQueue(pending, cause);
}
- final void setForwarder(final ReconnectForwarder forwarder) {
- Verify.verify(successor == null, "Successor {} already set on connection {}", successor, this);
- Verify.verify(inflight.isEmpty(), "In-flight requests after replay: %s", inflight);
- Verify.verify(pending.isEmpty(), "Pending requests after replay: %s", pending);
-
+ final void setForwarder(final ReconnectForwarder forwarder, final long now) {
+ Verify.verify(successor == null, "Successor %s already set on connection %s", successor, this);
successor = Preconditions.checkNotNull(forwarder);
- LOG.debug("Connection {} superseded by {}", this, successor);
+ LOG.debug("Connection {} superseded by {}, splicing queue", this, successor);
+
+ /*
+ * We need to account for entries which have been added between the time drain() was called and this method
+ * is invoked. Since the old connection is visible during replay and some entries may have completed on the
+ * replay thread, there was an avenue for this to happen.
+ */
+ int count = 0;
+ ConnectionEntry entry = inflight.poll();
+ while (entry != null) {
+ successor.replayEntry(entry, now);
+ entry = inflight.poll();
+ count++;
+ }
+
+ entry = pending.poll();
+ while (entry != null) {
+ successor.replayEntry(entry, now);
+ entry = pending.poll();
+ count++;
+ }
+
+ LOG.debug("Connection {} queue spliced {} messages", this, count);
}
final void remove(final long now) {