super(context, cookie, backend);
}
- private TransmittedConnectionEntry transmit(final ConnectionEntry entry) {
+ private void transmit(final ConnectionEntry entry) {
final long txSequence = nextTxSequence++;
final RequestEnvelope toSend = new RequestEnvelope(entry.getRequest().toVersion(remoteVersion()), sessionId(),
txSequence);
+ // We need to enqueue the request before we send it to the actor, as we may be executing on a different thread
+ // than the client actor thread, in which case the round-trip could be made faster than we can enqueue --
+ // in which case the receive routine would not find the entry.
+ final TransmittedConnectionEntry txEntry = new TransmittedConnectionEntry(entry, sessionId(), txSequence,
+ readTime());
+ appendToInflight(txEntry);
+
final ActorRef actor = remoteActor();
LOG.trace("Transmitting request {} as {} to {}", entry.getRequest(), toSend, actor);
actor.tell(toSend, ActorRef.noSender());
-
- return new TransmittedConnectionEntry(entry, sessionId(), txSequence, readTime());
}
@Override
void enqueueEntry(final ConnectionEntry entry) {
if (inflightSize() < remoteMaxMessages()) {
- appendToInflight(transmit(entry));
+ transmit(entry);
LOG.debug("Enqueued request {} to queue {}", entry.getRequest(), this);
} else {
LOG.debug("Queue is at capacity, delayed sending of request {}", entry.getRequest());
}
LOG.debug("Transmitting entry {}", e);
- appendToInflight(transmit(e));
+ transmit(e);
toSend--;
}
}