@VisibleForTesting
static final long REQUEST_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30);
+ private static final FiniteDuration REQUEST_TIMEOUT_DURATION = FiniteDuration.apply(REQUEST_TIMEOUT_NANOS,
+ TimeUnit.NANOSECONDS);
+
private final Lock lock = new ReentrantLock();
private final ClientActorContext context;
@GuardedBy("lock")
private final TransmitQueue queue;
private final Long cookie;
+ @GuardedBy("lock")
+ private boolean haveTimer;
+
private volatile RequestException poisoned;
// Do not allow subclassing outside of this package
@GuardedBy("lock")
final void finishReplay(final ReconnectForwarder forwarder) {
- queue.setForwarder(forwarder, readTime());
+ setForwarder(forwarder);
lock.unlock();
}
final long enqueueEntry(final ConnectionEntry entry, final long now) {
lock.lock();
try {
+ if (queue.isEmpty()) {
+ // The queue is becoming non-empty, schedule a timer
+ scheduleTimer(REQUEST_TIMEOUT_DURATION);
+ }
return queue.enqueue(entry, now);
} finally {
lock.unlock();
try {
TimeUnit.NANOSECONDS.sleep(delay);
} catch (InterruptedException e) {
- LOG.debug("Interrupted while sleeping");
+ LOG.debug("Interrupted while sleeping", e);
}
}
*
* @param delay Delay, in nanoseconds
*/
+ @GuardedBy("lock")
private void scheduleTimer(final FiniteDuration delay) {
+ if (haveTimer) {
+ LOG.debug("{}: timer already scheduled", context.persistenceId());
+ return;
+ }
+ if (queue.hasSuccessor()) {
+ LOG.debug("{}: connection has successor, not scheduling timer", context.persistenceId());
+ return;
+ }
LOG.debug("{}: scheduling timeout in {}", context.persistenceId(), delay);
context.executeInActor(this::runTimer, delay);
+ haveTimer = true;
}
/**
lock.lock();
try {
+ haveTimer = false;
final long now = readTime();
// The following line is only reliable when queue is not forwarding, but such state should not last long.
final long ticksSinceProgress = queue.ticksStalling(now);
// We have timed out. There is no point in scheduling a timer
return reconnectConnection(current);
}
+
+ if (delay.isPresent()) {
+ // If there is new delay, schedule a timer
+ scheduleTimer(delay.get());
+ }
} finally {
lock.unlock();
}
- if (delay.isPresent()) {
- // If there is new delay, schedule a timer
- scheduleTimer(delay.get());
- }
-
return current;
}