Bug 5280: Add ProgressTracker
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / TransmitQueue.java
index e1c5589004ec6685b086efacc32972d890516d8b..4a1b3a2f293a499a64e805250a5a3776222caa67 100644 (file)
@@ -52,6 +52,10 @@ import org.slf4j.LoggerFactory;
 @NotThreadSafe
 abstract class TransmitQueue {
     static final class Halted extends TransmitQueue {
+        Halted(final int targetDepth) {
+            super(targetDepth);
+        }
+
         @Override
         int canTransmitCount(final int inflightSize) {
             return 0;
@@ -67,7 +71,8 @@ abstract class TransmitQueue {
         private final BackendInfo backend;
         private long nextTxSequence;
 
-        Transmitting(final BackendInfo backend) {
+        Transmitting(final int targetDepth, final BackendInfo backend) {
+            super(targetDepth);
             this.backend = Preconditions.checkNotNull(backend);
         }
 
@@ -92,18 +97,22 @@ abstract class TransmitQueue {
 
     private final ArrayDeque<TransmittedConnectionEntry> inflight = new ArrayDeque<>();
     private final ArrayDeque<ConnectionEntry> pending = new ArrayDeque<>();
-
+    private final ProgressTracker tracker;
     private ReconnectForwarder successor;
 
+    TransmitQueue(final int targetDepth) {
+        tracker = new AveragingProgressTracker(targetDepth);
+    }
+
     final Iterable<ConnectionEntry> asIterable() {
         return Iterables.concat(inflight, pending);
     }
 
-    private void recordCompletion(final long now, final long enqueuedTicks, final long transmitTicks,
-            final long execNanos) {
-        // TODO: record
+    final long ticksStalling(final long now) {
+        return tracker.ticksStalling(now);
     }
 
+    // If a matching request was found, this will track a task was closed.
     final Optional<TransmittedConnectionEntry> complete(final ResponseEnvelope<?> envelope, final long now) {
         Optional<TransmittedConnectionEntry> maybeEntry = findMatchingEntry(inflight, envelope);
         if (maybeEntry == null) {
@@ -117,7 +126,7 @@ abstract class TransmitQueue {
         }
 
         final TransmittedConnectionEntry entry = maybeEntry.get();
-        recordCompletion(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
+        tracker.closeTask(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
 
         // We have freed up a slot, try to transmit something
         int toSend = canTransmitCount(inflight.size());
@@ -135,24 +144,35 @@ abstract class TransmitQueue {
         return Optional.of(entry);
     }
 
-    final void enqueue(final ConnectionEntry entry, final long now) {
+    /**
+     * Enqueue an entry, possibly also transmitting it.
+     *
+     * @return Delay to be forced on the calling thread, in nanoseconds.
+     */
+    final long enqueue(final ConnectionEntry entry, final long now) {
         if (successor != null) {
             successor.forwardEntry(entry, now);
-            return;
+            return 0;
         }
 
+        // Reserve an entry before we do anything that can fail
+        final long delay = tracker.openTask(now);
         if (canTransmitCount(inflight.size()) <= 0) {
             LOG.trace("Queue is at capacity, delayed sending of request {}", entry.getRequest());
             pending.add(entry);
-            return;
+        } else {
+            // We are not thread-safe and are supposed to be externally-guarded,
+            // hence send-before-record should be fine.
+            // This needs to be revisited if the external guards are lowered.
+            inflight.offer(transmit(entry, now));
+            LOG.debug("Sent request {} on queue {}", entry.getRequest(), this);
         }
-
-        // We are not thread-safe and are supposed to be externally-guarded, hence send-before-record should be fine.
-        // This needs to be revisited if the external guards are lowered.
-        inflight.offer(transmit(entry, now));
-        LOG.debug("Sent request {} on queue {}", entry.getRequest(), this);
+        return delay;
     }
 
+    /**
+     * Return the number of entries which can be transmitted assuming the supplied in-flight queue size.
+     */
     abstract int canTransmitCount(int inflightSize);
 
     abstract TransmittedConnectionEntry transmit(ConnectionEntry entry, long now);