-
- SequencedQueue queueFor(final Long cookie) {
- return queues.computeIfAbsent(cookie, t -> new SequencedQueue(t, ticker()));
- }
-
- void removeQueue(final SequencedQueue queue) {
- queues.remove(queue.getCookie(), queue);
- }
-
- ClientActorBehavior completeRequest(final ClientActorBehavior current, final ResponseEnvelope<?> response) {
- final WritableIdentifier id = response.getMessage().getTarget();
-
- // FIXME: this will need to be updated for other Request/Response types to extract cookie
- Preconditions.checkArgument(id instanceof TransactionIdentifier);
- final TransactionIdentifier txId = (TransactionIdentifier) id;
-
- final SequencedQueue queue = queues.get(txId.getHistoryId().getCookie());
- if (queue == null) {
- LOG.info("{}: Ignoring unknown response {}", persistenceId(), response);
- return current;
- } else {
- return queue.complete(current, response);
- }
- }
-
- void poison(final RequestException cause) {
- for (SequencedQueue q : queues.values()) {
- q.poison(cause);
- }
-
- queues.clear();
- }