+ private long enqueueOrForward(final ConnectionEntry entry, final long now) {
+ lock.lock();
+ try {
+ commonEnqueue(entry, now);
+ return queue.enqueueOrForward(entry, now);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Enqueue an entry, possibly also transmitting it.
+ */
+ public final void enqueueEntry(final ConnectionEntry entry, final long now) {
+ lock.lock();
+ try {
+ commonEnqueue(entry, now);
+ queue.enqueueOrReplay(entry, now);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @GuardedBy("lock")
+ private void commonEnqueue(final ConnectionEntry entry, final long now) {
+ final RequestException maybePoison = poisoned;
+ if (maybePoison != null) {
+ throw new IllegalStateException("Connection " + this + " has been poisoned", maybePoison);
+ }
+
+ if (queue.isEmpty()) {
+ // The queue is becoming non-empty, schedule a timer.
+ scheduleTimer(entry.getEnqueuedTicks() + context.config().getRequestTimeout() - now);
+ }
+ }
+
+ // To be called from ClientActorBehavior on ConnectedClientConnection after entries are replayed.
+ final void cancelDebt() {
+ queue.cancelDebt(currentTime());
+ }
+