+ int tasksTimedOut = 0;
+ for (ConnectionEntry head = queue.peek(); head != null; head = queue.peek()) {
+ final long beenOpen = now - head.getEnqueuedTicks();
+ if (beenOpen < REQUEST_TIMEOUT_NANOS) {
+ return Optional.of(REQUEST_TIMEOUT_NANOS - beenOpen);
+ }
+
+ tasksTimedOut++;
+ queue.remove(now);
+ LOG.debug("Connection {} timed out entryt {}", this, head);
+ head.complete(head.getRequest().toRequestFailure(
+ new RequestTimeoutException("Timed out after " + beenOpen + "ns")));
+ }
+
+ LOG.debug("Connection {} timed out {} tasks", this, tasksTimedOut);
+ if (tasksTimedOut != 0) {
+ queue.tryTransmit(now);
+ }
+
+ return Optional.empty();