// Emit a debug entry if we sleep for more that this amount
private static final long DEBUG_DELAY_NANOS = TimeUnit.MILLISECONDS.toNanos(100);
+ // Upper bound on the time a thread is forced to sleep to keep queue size under control
+ private static final long MAX_DELAY_SECONDS = 5;
+ private static final long MAX_DELAY_NANOS = TimeUnit.SECONDS.toNanos(MAX_DELAY_SECONDS);
+
private final Lock lock = new ReentrantLock();
private final ClientActorContext context;
@GuardedBy("lock")
*/
public final void sendRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback) {
final long now = currentTime();
- final long delay = enqueueEntry(new ConnectionEntry(request, callback, now), now);
+ long delay = enqueueEntry(new ConnectionEntry(request, callback, now), now);
try {
- if (delay >= DEBUG_DELAY_NANOS && LOG.isDebugEnabled()) {
- LOG.debug("Sleeping for {}ms", TimeUnit.NANOSECONDS.toMillis(delay));
+ if (delay >= DEBUG_DELAY_NANOS) {
+ if (delay > MAX_DELAY_NANOS) {
+ LOG.info("Capping {} throttle delay from {} to {} seconds", this,
+ TimeUnit.NANOSECONDS.toSeconds(delay), MAX_DELAY_SECONDS);
+ delay = MAX_DELAY_NANOS;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sleeping for {}ms", TimeUnit.NANOSECONDS.toMillis(delay));
+ }
}
TimeUnit.NANOSECONDS.sleep(delay);
} catch (InterruptedException e) {
@GuardedBy("lock")
final void finishReplay(final ReconnectForwarder forwarder) {
setForwarder(forwarder);
+
+ /*
+ * The process of replaying all messages may have taken a significant chunk of time, depending on type
+ * of messages, queue depth and available processing power. In extreme situations this may have already
+ * exceeded BACKEND_ALIVE_TIMEOUT_NANOS, in which case we are running the risk of not making reasonable forward
+ * progress before we start a reconnect cycle.
+ *
+ * Note that the timer is armed after we have sent the first message, hence we should be seeing a response
+ * from the backend before we see a timeout, simply due to how the mailbox operates.
+ *
+ * At any rate, reset the timestamp once we complete reconnection (which an atomic transition from the
+ * perspective of outside world), as that makes it a bit easier to reason about timing of events.
+ */
+ lastReceivedTicks = currentTime();
lock.unlock();
}
import com.google.common.annotations.Beta;
import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
import com.google.common.base.Verify;
import java.util.Collection;
import java.util.Map;
LOG.info("{}: resolved shard {} to {}", persistenceId(), shard, backend);
final long stamp = connectionsLock.writeLock();
try {
+ final Stopwatch sw = Stopwatch.createStarted();
+
// Create a new connected connection
final ConnectedClientConnection<T> newConn = new ConnectedClientConnection<>(conn.context(),
conn.cookie(), backend);
LOG.warn("{}: old connection {} does not match existing {}, new connection {} in limbo",
persistenceId(), conn, existing, newConn);
} else {
- LOG.info("{}: replaced connection {} with {}", persistenceId(), conn, newConn);
+ LOG.info("{}: replaced connection {} with {} in {}", persistenceId(), conn, newConn, sw);
}
} finally {
connectionsLock.unlockWrite(stamp);