X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fcds-access-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Faccess%2Fclient%2FTransmitQueue.java;h=4a1b3a2f293a499a64e805250a5a3776222caa67;hp=e1c5589004ec6685b086efacc32972d890516d8b;hb=a36d5af3e383cbddc31527a6d05bc23de3f3571d;hpb=c0ddac051a1eec4ac2b12191ce61b6fcec265772;ds=sidebyside diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java index e1c5589004..4a1b3a2f29 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java @@ -52,6 +52,10 @@ import org.slf4j.LoggerFactory; @NotThreadSafe abstract class TransmitQueue { static final class Halted extends TransmitQueue { + Halted(final int targetDepth) { + super(targetDepth); + } + @Override int canTransmitCount(final int inflightSize) { return 0; @@ -67,7 +71,8 @@ abstract class TransmitQueue { private final BackendInfo backend; private long nextTxSequence; - Transmitting(final BackendInfo backend) { + Transmitting(final int targetDepth, final BackendInfo backend) { + super(targetDepth); this.backend = Preconditions.checkNotNull(backend); } @@ -92,18 +97,22 @@ abstract class TransmitQueue { private final ArrayDeque inflight = new ArrayDeque<>(); private final ArrayDeque pending = new ArrayDeque<>(); - + private final ProgressTracker tracker; private ReconnectForwarder successor; + TransmitQueue(final int targetDepth) { + tracker = new AveragingProgressTracker(targetDepth); + } + final Iterable asIterable() { return Iterables.concat(inflight, pending); } - private void recordCompletion(final long now, final long enqueuedTicks, final long transmitTicks, - final long execNanos) { - // TODO: record + final long ticksStalling(final long now) { + return tracker.ticksStalling(now); } + // If a matching request was found, this will track a task was closed. final Optional complete(final ResponseEnvelope envelope, final long now) { Optional maybeEntry = findMatchingEntry(inflight, envelope); if (maybeEntry == null) { @@ -117,7 +126,7 @@ abstract class TransmitQueue { } final TransmittedConnectionEntry entry = maybeEntry.get(); - recordCompletion(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos()); + tracker.closeTask(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos()); // We have freed up a slot, try to transmit something int toSend = canTransmitCount(inflight.size()); @@ -135,24 +144,35 @@ abstract class TransmitQueue { return Optional.of(entry); } - final void enqueue(final ConnectionEntry entry, final long now) { + /** + * Enqueue an entry, possibly also transmitting it. + * + * @return Delay to be forced on the calling thread, in nanoseconds. + */ + final long enqueue(final ConnectionEntry entry, final long now) { if (successor != null) { successor.forwardEntry(entry, now); - return; + return 0; } + // Reserve an entry before we do anything that can fail + final long delay = tracker.openTask(now); if (canTransmitCount(inflight.size()) <= 0) { LOG.trace("Queue is at capacity, delayed sending of request {}", entry.getRequest()); pending.add(entry); - return; + } else { + // We are not thread-safe and are supposed to be externally-guarded, + // hence send-before-record should be fine. + // This needs to be revisited if the external guards are lowered. + inflight.offer(transmit(entry, now)); + LOG.debug("Sent request {} on queue {}", entry.getRequest(), this); } - - // We are not thread-safe and are supposed to be externally-guarded, hence send-before-record should be fine. - // This needs to be revisited if the external guards are lowered. - inflight.offer(transmit(entry, now)); - LOG.debug("Sent request {} on queue {}", entry.getRequest(), this); + return delay; } + /** + * Return the number of entries which can be transmitted assuming the supplied in-flight queue size. + */ abstract int canTransmitCount(int inflightSize); abstract TransmittedConnectionEntry transmit(ConnectionEntry entry, long now);