The timer which is supposed to timeout requests and detect
overall badness of the backeend was not being armed. Fix that
by scheduling it whenever we make the queue non-empty.
Change-Id: I9d8be694e3ed5154b66baca76c0788840a38c2f7
Signed-off-by: Robert Varga <rovarga@cisco.com>
@VisibleForTesting
static final long REQUEST_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30);
@VisibleForTesting
static final long REQUEST_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30);
+ private static final FiniteDuration REQUEST_TIMEOUT_DURATION = FiniteDuration.apply(REQUEST_TIMEOUT_NANOS,
+ TimeUnit.NANOSECONDS);
+
private final Lock lock = new ReentrantLock();
private final ClientActorContext context;
@GuardedBy("lock")
private final TransmitQueue queue;
private final Long cookie;
private final Lock lock = new ReentrantLock();
private final ClientActorContext context;
@GuardedBy("lock")
private final TransmitQueue queue;
private final Long cookie;
+ @GuardedBy("lock")
+ private boolean haveTimer;
+
private volatile RequestException poisoned;
// Do not allow subclassing outside of this package
private volatile RequestException poisoned;
// Do not allow subclassing outside of this package
@GuardedBy("lock")
final void finishReplay(final ReconnectForwarder forwarder) {
@GuardedBy("lock")
final void finishReplay(final ReconnectForwarder forwarder) {
- queue.setForwarder(forwarder, readTime());
+ setForwarder(forwarder);
final long enqueueEntry(final ConnectionEntry entry, final long now) {
lock.lock();
try {
final long enqueueEntry(final ConnectionEntry entry, final long now) {
lock.lock();
try {
+ if (queue.isEmpty()) {
+ // The queue is becoming non-empty, schedule a timer
+ scheduleTimer(REQUEST_TIMEOUT_DURATION);
+ }
return queue.enqueue(entry, now);
} finally {
lock.unlock();
return queue.enqueue(entry, now);
} finally {
lock.unlock();
try {
TimeUnit.NANOSECONDS.sleep(delay);
} catch (InterruptedException e) {
try {
TimeUnit.NANOSECONDS.sleep(delay);
} catch (InterruptedException e) {
- LOG.debug("Interrupted while sleeping");
+ LOG.debug("Interrupted while sleeping", e);
*
* @param delay Delay, in nanoseconds
*/
*
* @param delay Delay, in nanoseconds
*/
private void scheduleTimer(final FiniteDuration delay) {
private void scheduleTimer(final FiniteDuration delay) {
+ if (haveTimer) {
+ LOG.debug("{}: timer already scheduled", context.persistenceId());
+ return;
+ }
+ if (queue.hasSuccessor()) {
+ LOG.debug("{}: connection has successor, not scheduling timer", context.persistenceId());
+ return;
+ }
LOG.debug("{}: scheduling timeout in {}", context.persistenceId(), delay);
context.executeInActor(this::runTimer, delay);
LOG.debug("{}: scheduling timeout in {}", context.persistenceId(), delay);
context.executeInActor(this::runTimer, delay);
final long now = readTime();
// The following line is only reliable when queue is not forwarding, but such state should not last long.
final long ticksSinceProgress = queue.ticksStalling(now);
final long now = readTime();
// The following line is only reliable when queue is not forwarding, but such state should not last long.
final long ticksSinceProgress = queue.ticksStalling(now);
// We have timed out. There is no point in scheduling a timer
return reconnectConnection(current);
}
// We have timed out. There is no point in scheduling a timer
return reconnectConnection(current);
}
+
+ if (delay.isPresent()) {
+ // If there is new delay, schedule a timer
+ scheduleTimer(delay.get());
+ }
} finally {
lock.unlock();
}
} finally {
lock.unlock();
}
- if (delay.isPresent()) {
- // If there is new delay, schedule a timer
- scheduleTimer(delay.get());
- }
-
return tracker.ticksStalling(now);
}
return tracker.ticksStalling(now);
}
+ final boolean hasSuccessor() {
+ return successor != null;
+ }
+
// If a matching request was found, this will track a task was closed.
final Optional<TransmittedConnectionEntry> complete(final ResponseEnvelope<?> envelope, final long now) {
Optional<TransmittedConnectionEntry> maybeEntry = findMatchingEntry(inflight, envelope);
// If a matching request was found, this will track a task was closed.
final Optional<TransmittedConnectionEntry> complete(final ResponseEnvelope<?> envelope, final long now) {
Optional<TransmittedConnectionEntry> maybeEntry = findMatchingEntry(inflight, envelope);