+ @GuardedBy("lock")
+ final void setForwarder(final ReconnectForwarder forwarder) {
+ queue.setForwarder(forwarder, currentTime());
+ }
+
+ @GuardedBy("lock")
+ abstract ClientActorBehavior<T> lockedReconnect(ClientActorBehavior<T> current,
+ RequestException runtimeRequestException);
+
+ final void sendEntry(final ConnectionEntry entry, final long now) {
+ long delay = enqueueOrForward(entry, now);
+ try {
+ if (delay >= DEBUG_DELAY_NANOS) {
+ if (delay > MAX_DELAY_NANOS) {
+ LOG.info("Capping {} throttle delay from {} to {} seconds", this,
+ TimeUnit.NANOSECONDS.toSeconds(delay), MAX_DELAY_SECONDS, new Throwable());
+ delay = MAX_DELAY_NANOS;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}: Sleeping for {}ms on connection {}", context.persistenceId(),
+ TimeUnit.NANOSECONDS.toMillis(delay), this);
+ }
+ }
+ TimeUnit.NANOSECONDS.sleep(delay);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.debug("Interrupted after sleeping {}ns", currentTime() - now, e);
+ }
+ }
+
+ final ClientActorBehavior<T> reconnect(final ClientActorBehavior<T> current, final RequestException cause) {
+ lock.lock();
+ try {
+ return lockedReconnect(current, cause);
+ } finally {
+ lock.unlock();
+ }