@NotThreadSafe
abstract class TransmitQueue {
static final class Halted extends TransmitQueue {
+ // For ConnectingClientConnection.
Halted(final int targetDepth) {
super(targetDepth);
}
+ // For ReconnectingClientConnection.
+ Halted(final TransmitQueue oldQueue, final long now) {
+ super(oldQueue, now);
+ }
+
@Override
int canTransmitCount(final int inflightSize) {
return 0;
private final BackendInfo backend;
private long nextTxSequence;
- Transmitting(final int targetDepth, final BackendInfo backend) {
- super(targetDepth);
+ // For ConnectedClientConnection.
+ Transmitting(final TransmitQueue oldQueue, final int targetDepth, final BackendInfo backend, final long now) {
+ super(oldQueue, targetDepth, now);
this.backend = Preconditions.checkNotNull(backend);
}
private final Deque<TransmittedConnectionEntry> inflight = new ArrayDeque<>();
private final Deque<ConnectionEntry> pending = new ArrayDeque<>();
- private final ProgressTracker tracker;
+ private final AveragingProgressTracker tracker; // Cannot be just ProgressTracker as we are inheriting limits.
private ReconnectForwarder successor;
+ /**
+ * Construct initial transmitting queue.
+ */
TransmitQueue(final int targetDepth) {
tracker = new AveragingProgressTracker(targetDepth);
}
+ /**
+ * Construct new transmitting queue while inheriting timing data from the previous transmit queue instance.
+ */
+ TransmitQueue(final TransmitQueue oldQueue, final int targetDepth, final long now) {
+ tracker = new AveragingProgressTracker(oldQueue.tracker, targetDepth, now);
+ }
+
+ /**
+ * Construct new transmitting queue while inheriting timing and size data from the previous transmit queue instance.
+ */
+ TransmitQueue(final TransmitQueue oldQueue, final long now) {
+ tracker = new AveragingProgressTracker(oldQueue.tracker, now);
+ }
+
+ /**
+ * Cancel the accumulated sum of delays as we expect the new backend to work now.
+ */
+ void cancelDebt(final long now) {
+ tracker.cancelDebt(now);
+ }
+
/**
* Drain the contents of the connection into a list. This will leave the queue empty and allow further entries
* to be added to it during replay. When we set the successor all entries enqueued between when this methods
inflight.addLast(transmit(entry, now));
}
- /**
- * Enqueue an entry, possibly also transmitting it.
- *
- * @return Delay to be forced on the calling thread, in nanoseconds.
- */
- final long enqueue(final ConnectionEntry entry, final long now) {
+ final long enqueueOrForward(final ConnectionEntry entry, final long now) {
if (successor != null) {
// This call will pay the enqueuing price, hence the caller does not have to
successor.forwardEntry(entry, now);
return 0;
}
+ return enqueue(entry, now);
+ }
+
+ final void enqueueOrReplay(final ConnectionEntry entry, final long now) {
+ if (successor != null) {
+ successor.replayEntry(entry, now);
+ } else {
+ enqueue(entry, now);
+ }
+ }
+
+ /**
+ * Enqueue an entry, possibly also transmitting it.
+ *
+ * @return Delay to be forced on the calling thread, in nanoseconds.
+ */
+ private long enqueue(final ConnectionEntry entry, final long now) {
+
// XXX: we should place a guard against incorrect entry sequences:
// entry.getEnqueueTicks() should have non-negative difference from the last entry present in the queues