+ return null;
+ }
+
+ ClientActorBehavior complete(final ClientActorBehavior current, final ResponseEnvelope<?> envelope) {
+ Optional<SequencedQueueEntry> maybeEntry = findMatchingEntry(currentInflight, envelope);
+ if (maybeEntry == null) {
+ maybeEntry = findMatchingEntry(lastInflight, envelope);
+ }
+
+ if (maybeEntry == null || !maybeEntry.isPresent()) {
+ LOG.warn("No request matching {} found, ignoring response", envelope);
+ return current;
+ }
+
+ lastProgress = ticker.read();
+ final ClientActorBehavior ret = maybeEntry.get().complete(envelope.getMessage());
+
+ // We have freed up a slot, try to transmit something
+ if (backend != null) {
+ final int toSend = lastTxLimit - currentInflight.size();
+ if (toSend > 0) {
+ runTransmit(toSend);
+ }
+ }
+
+ return ret;
+ }
+
+ private int transmitEntries(final Queue<SequencedQueueEntry> queue, final int count) {
+ int toSend = count;
+
+ while (toSend > 0) {
+ final SequencedQueueEntry e = queue.poll();
+ if (e == null) {
+ break;
+ }
+
+ LOG.debug("Transmitting entry {}", e);
+ e.retransmit(backend, nextTxSequence(), lastProgress);
+ toSend--;
+ }
+
+ return toSend;
+ }
+
+ private void runTransmit(final int count) {
+ final int toSend;
+
+ // Process lastInflight first, possibly clearing it
+ if (!lastInflight.isEmpty()) {
+ toSend = transmitEntries(lastInflight, count);
+ if (lastInflight.isEmpty()) {
+ // We won't be needing the queue anymore, change it to specialized implementation
+ lastInflight = EmptyQueue.getInstance();
+ }
+ } else {
+ toSend = count;
+ }
+
+ // Process pending next.
+ transmitEntries(pending, toSend);