+ int tasksTimedOut = 0;
+ for (ConnectionEntry head = queue.peek(); head != null; head = queue.peek()) {
+ final long beenOpen = now - head.getEnqueuedTicks();
+ final long requestTimeout = context.config().getRequestTimeout();
+ if (beenOpen < requestTimeout) {
+ return Optional.of(requestTimeout - beenOpen);
+ }
+
+ tasksTimedOut++;
+ queue.remove(now);
+ LOG.debug("{}: Connection {} timed out entry {}", context.persistenceId(), this, head);
+
+ timeoutEntry(head, beenOpen);
+ }
+
+ LOG.debug("Connection {} timed out {} tasks", this, tasksTimedOut);
+ if (tasksTimedOut != 0) {
+ queue.tryTransmit(now);
+ }
+
+ return Optional.empty();
+ }
+
+ private void timeoutEntry(final ConnectionEntry entry, final long beenOpen) {
+ // Timeouts needs to be re-scheduled on actor thread because we are holding the lock on the current queue,
+ // which may be the tail of a successor chain. This is a problem if the callback attempts to send a request
+ // because that will attempt to lock the chain from the start, potentially causing a deadlock if there is
+ // a concurrent attempt to transmit.
+ context.executeInActor(current -> {
+ final double time = beenOpen * 1.0 / 1_000_000_000;
+ entry.complete(entry.getRequest().toRequestFailure(
+ new RequestTimeoutException(entry.getRequest() + " timed out after " + time
+ + " seconds. The backend for " + backendName + " is not available.")));
+ return current;
+ });