package org.opendaylight.controller.cluster.access.client;
import akka.actor.ActorRef;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
-import com.google.common.collect.Iterables;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
import java.util.Iterator;
import java.util.Optional;
import java.util.Queue;
@NotThreadSafe
abstract class TransmitQueue {
static final class Halted extends TransmitQueue {
+ Halted(final int targetDepth) {
+ super(targetDepth);
+ }
+
@Override
int canTransmitCount(final int inflightSize) {
return 0;
private final BackendInfo backend;
private long nextTxSequence;
- Transmitting(final BackendInfo backend) {
+ Transmitting(final int targetDepth, final BackendInfo backend) {
+ super(targetDepth);
this.backend = Preconditions.checkNotNull(backend);
}
private static final Logger LOG = LoggerFactory.getLogger(TransmitQueue.class);
- private final ArrayDeque<TransmittedConnectionEntry> inflight = new ArrayDeque<>();
- private final ArrayDeque<ConnectionEntry> pending = new ArrayDeque<>();
-
+ private final Deque<TransmittedConnectionEntry> inflight = new ArrayDeque<>();
+ private final Deque<ConnectionEntry> pending = new ArrayDeque<>();
+ private final ProgressTracker tracker;
private ReconnectForwarder successor;
- final Iterable<ConnectionEntry> asIterable() {
- return Iterables.concat(inflight, pending);
+ TransmitQueue(final int targetDepth) {
+ tracker = new AveragingProgressTracker(targetDepth);
+ }
+
+ /**
+ * Drain the contents of the connection into a list. This will leave the queue empty and allow further entries
+ * to be added to it during replay. When we set the successor all entries enqueued between when this methods
+ * returns and the successor is set will be replayed to the successor.
+ *
+ * @return Collection of entries present in the queue.
+ */
+ final Collection<ConnectionEntry> drain() {
+ final Collection<ConnectionEntry> ret = new ArrayDeque<>(inflight.size() + pending.size());
+ ret.addAll(inflight);
+ ret.addAll(pending);
+ inflight.clear();
+ pending.clear();
+ return ret;
+ }
+
+ final long ticksStalling(final long now) {
+ return tracker.ticksStalling(now);
}
- private void recordCompletion(final long now, final long enqueuedTicks, final long transmitTicks,
- final long execNanos) {
- // TODO: record
+ final boolean hasSuccessor() {
+ return successor != null;
}
+ // If a matching request was found, this will track a task was closed.
final Optional<TransmittedConnectionEntry> complete(final ResponseEnvelope<?> envelope, final long now) {
Optional<TransmittedConnectionEntry> maybeEntry = findMatchingEntry(inflight, envelope);
if (maybeEntry == null) {
}
final TransmittedConnectionEntry entry = maybeEntry.get();
- recordCompletion(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
+ tracker.closeTask(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
// We have freed up a slot, try to transmit something
- int toSend = canTransmitCount(inflight.size());
- while (toSend > 0) {
+ tryTransmit(now);
+
+ return Optional.of(entry);
+ }
+
+ final void tryTransmit(final long now) {
+ final int toSend = canTransmitCount(inflight.size());
+ if (toSend > 0 && !pending.isEmpty()) {
+ transmitEntries(toSend, now);
+ }
+ }
+
+ private void transmitEntries(final int maxTransmit, final long now) {
+ for (int i = 0; i < maxTransmit; ++i) {
final ConnectionEntry e = pending.poll();
if (e == null) {
- break;
+ LOG.debug("Queue {} transmitted {} requests", this, i);
+ return;
}
- LOG.debug("Transmitting entry {}", e);
- transmit(e, now);
- toSend--;
+ transmitEntry(e, now);
}
- return Optional.of(entry);
+ LOG.debug("Queue {} transmitted {} requests", this, maxTransmit);
+ }
+
+ private void transmitEntry(final ConnectionEntry entry, final long now) {
+ LOG.debug("Queue {} transmitting entry {}", entry);
+ // We are not thread-safe and are supposed to be externally-guarded,
+ // hence send-before-record should be fine.
+ // This needs to be revisited if the external guards are lowered.
+ inflight.addLast(transmit(entry, now));
}
- final void enqueue(final ConnectionEntry entry, final long now) {
+ /**
+ * Enqueue an entry, possibly also transmitting it.
+ *
+ * @return Delay to be forced on the calling thread, in nanoseconds.
+ */
+ final long enqueue(final ConnectionEntry entry, final long now) {
if (successor != null) {
+ // This call will pay the enqueuing price, hence the caller does not have to
successor.forwardEntry(entry, now);
- return;
+ return 0;
}
- if (canTransmitCount(inflight.size()) <= 0) {
+ // XXX: we should place a guard against incorrect entry sequences:
+ // entry.getEnqueueTicks() should have non-negative difference from the last entry present in the queues
+
+ // Reserve an entry before we do anything that can fail
+ final long delay = tracker.openTask(now);
+
+ /*
+ * This is defensive to make sure we do not do the wrong thing here and reorder messages if we ever happen
+ * to have available send slots and non-empty pending queue.
+ */
+ final int toSend = canTransmitCount(inflight.size());
+ if (toSend <= 0) {
LOG.trace("Queue is at capacity, delayed sending of request {}", entry.getRequest());
- pending.add(entry);
- return;
+ pending.addLast(entry);
+ return delay;
}
- // We are not thread-safe and are supposed to be externally-guarded, hence send-before-record should be fine.
- // This needs to be revisited if the external guards are lowered.
- inflight.offer(transmit(entry, now));
- LOG.debug("Sent request {} on queue {}", entry.getRequest(), this);
+ if (pending.isEmpty()) {
+ transmitEntry(entry, now);
+ return delay;
+ }
+
+ pending.addLast(entry);
+ transmitEntries(toSend, now);
+ return delay;
}
+ /**
+ * Return the number of entries which can be transmitted assuming the supplied in-flight queue size.
+ */
abstract int canTransmitCount(int inflightSize);
abstract TransmittedConnectionEntry transmit(ConnectionEntry entry, long now);
successor = Preconditions.checkNotNull(forwarder);
LOG.debug("Connection {} superseded by {}, splicing queue", this, successor);
+ /*
+ * We need to account for entries which have been added between the time drain() was called and this method
+ * is invoked. Since the old connection is visible during replay and some entries may have completed on the
+ * replay thread, there was an avenue for this to happen.
+ */
+ int count = 0;
ConnectionEntry entry = inflight.poll();
while (entry != null) {
- successor.forwardEntry(entry, now);
+ successor.replayEntry(entry, now);
entry = inflight.poll();
+ count++;
}
entry = pending.poll();
while (entry != null) {
- successor.forwardEntry(entry, now);
+ successor.replayEntry(entry, now);
entry = pending.poll();
+ count++;
}
+
+ LOG.debug("Connection {} queue spliced {} messages", this, count);
+ }
+
+ final void remove(final long now) {
+ final TransmittedConnectionEntry txe = inflight.poll();
+ if (txe == null) {
+ final ConnectionEntry entry = pending.pop();
+ tracker.closeTask(now, entry.getEnqueuedTicks(), 0, 0);
+ } else {
+ tracker.closeTask(now, txe.getEnqueuedTicks(), txe.getTxTicks(), 0);
+ }
+ }
+
+ @VisibleForTesting
+ Deque<TransmittedConnectionEntry> getInflight() {
+ return inflight;
+ }
+
+ @VisibleForTesting
+ Deque<ConnectionEntry> getPending() {
+ return pending;
}
/*
}
queue.clear();
}
-
}