+ int tasksTimedOut = 0;
+ for (ConnectionEntry head = queue.peek(); head != null; head = queue.peek()) {
+ final long beenOpen = now - head.getEnqueuedTicks();
+ if (beenOpen < REQUEST_TIMEOUT_NANOS) {
+ return Optional.of(REQUEST_TIMEOUT_NANOS - beenOpen);
+ }
+
+ tasksTimedOut++;
+ queue.remove(now);
+ LOG.debug("{}: Connection {} timed out entry {}", context.persistenceId(), this, head);
+
+ final double time = (beenOpen * 1.0) / 1_000_000_000;
+ head.complete(head.getRequest().toRequestFailure(
+ new RequestTimeoutException("Timed out after " + time + "seconds")));
+ }
+
+ LOG.debug("Connection {} timed out {} tasks", this, tasksTimedOut);
+ if (tasksTimedOut != 0) {
+ queue.tryTransmit(now);
+ }
+
+ return Optional.empty();