+ final TxDetails txDetails = Verify.verifyNotNull(e.getTxDetails());
+
+ final Request<?, ?> request = e.getRequest();
+ final Response<?, ?> response = envelope.getMessage();
+
+ // First check for matching target, or move to next entry
+ if (!request.getTarget().equals(response.getTarget())) {
+ continue;
+ }
+
+ // Sanity-check logical sequence, ignore any out-of-order messages
+ if (request.getSequence() != response.getSequence()) {
+ LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope);
+ return Optional.empty();
+ }
+
+ // Now check session match
+ if (envelope.getSessionId() != txDetails.getSessionId()) {
+ LOG.debug("Expecting session {}, ignoring response {}", txDetails.getSessionId(), envelope);
+ return Optional.empty();
+ }
+ if (envelope.getTxSequence() != txDetails.getTxSequence()) {
+ LOG.warn("Expecting txSequence {}, ignoring response {}", txDetails.getTxSequence(), envelope);
+ return Optional.empty();
+ }
+
+ LOG.debug("Completing request {} with {}", request, envelope);
+ it.remove();
+ return Optional.of(e);
+ }
+
+ return null;
+ }
+
+ ClientActorBehavior complete(final ClientActorBehavior current, final ResponseEnvelope<?> envelope) {
+ Optional<SequencedQueueEntry> maybeEntry = findMatchingEntry(currentInflight, envelope);
+ if (maybeEntry == null) {
+ maybeEntry = findMatchingEntry(lastInflight, envelope);
+ }
+
+ if (maybeEntry == null || !maybeEntry.isPresent()) {
+ LOG.warn("No request matching {} found, ignoring response", envelope);
+ return current;
+ }
+
+ lastProgress = ticker.read();
+ final ClientActorBehavior ret = maybeEntry.get().complete(envelope.getMessage());
+
+ // We have freed up a slot, try to transmit something
+ if (backend != null) {
+ final int toSend = lastTxLimit - currentInflight.size();
+ if (toSend > 0) {
+ runTransmit(toSend);
+ }
+ }
+
+ return ret;
+ }
+
+ private int transmitEntries(final Queue<SequencedQueueEntry> queue, final int count) {
+ int toSend = count;
+
+ while (toSend > 0) {
+ final SequencedQueueEntry e = queue.poll();
+ if (e == null) {
+ break;
+ }
+
+ LOG.debug("Transmitting entry {}", e);
+ e.retransmit(backend, nextTxSequence(), lastProgress);
+ toSend--;
+ }
+
+ return toSend;
+ }
+
+ private void runTransmit(final int count) {
+ final int toSend;
+
+ // Process lastInflight first, possibly clearing it
+ if (!lastInflight.isEmpty()) {
+ toSend = transmitEntries(lastInflight, count);
+ if (lastInflight.isEmpty()) {
+ // We won't be needing the queue anymore, change it to specialized implementation
+ lastInflight = EmptyQueue.getInstance();