HistoryReconnectCohort::getProxy), ProxyReconnectCohort::getIdentifier));
}
-
@Override
protected void forwardEntry(final ConnectionEntry entry, final long now) {
+ try {
+ findCohort(entry).forwardEntry(entry, this::sendToSuccessor);
+ } catch (RequestException e) {
+ entry.complete(entry.getRequest().toRequestFailure(e));
+ }
+ }
+
+ @Override
+ protected void replayEntry(final ConnectionEntry entry, final long now) {
+ try {
+ findCohort(entry).replayEntry(entry, this::replayToSuccessor);
+ } catch (RequestException e) {
+ entry.complete(entry.getRequest().toRequestFailure(e));
+ }
+ }
+
+ private ProxyReconnectCohort findCohort(final ConnectionEntry entry) throws CohortNotFoundException {
final Request<? , ?> request = entry.getRequest();
final LocalHistoryIdentifier historyId;
throw new IllegalArgumentException("Unhandled request " + request);
}
- try {
- final ProxyReconnectCohort cohort = cohorts.get(historyId);
- if (cohort == null) {
- LOG.warn("Cohort for request {} not found, aborting it", request);
- throw new CohortNotFoundException(historyId);
- }
-
- // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
- // period required to get into the queue.
- cohort.forwardRequest(request, entry.getCallback(), this::sendToSuccessor);
- } catch (RequestException e) {
- entry.complete(request.toRequestFailure(e));
+ final ProxyReconnectCohort cohort = cohorts.get(historyId);
+ if (cohort == null) {
+ LOG.warn("Cohort for request {} not found, aborting it", request);
+ throw new CohortNotFoundException(historyId);
}
+
+ return cohort;
}
-}
\ No newline at end of file
+}