// Cohort aware forwarder, which forwards the request to the cohort, giving it a reference to the successor
// connection
final class BouncingReconnectForwarder extends ReconnectForwarder {
- private static final Logger LOG = LoggerFactory.getLogger(BouncingReconnectForwarder.class);
-
- private static final RequestException FAILED_TO_REPLAY_EXCEPTION = new RequestException("Cohort not found") {
+ private static final class CohortNotFoundException extends RequestException {
private static final long serialVersionUID = 1L;
+ CohortNotFoundException(final LocalHistoryIdentifier historyId) {
+ super("Cohort for " + historyId + " not found");
+ }
+
@Override
public boolean isRetriable() {
return false;
}
- };
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(BouncingReconnectForwarder.class);
private final Map<LocalHistoryIdentifier, ProxyReconnectCohort> cohorts;
HistoryReconnectCohort::getProxy), ProxyReconnectCohort::getIdentifier));
}
+ @Override
+ protected void forwardEntry(final ConnectionEntry entry, final long now) {
+ try {
+ findCohort(entry).forwardEntry(entry, this::sendToSuccessor);
+ } catch (RequestException e) {
+ entry.complete(entry.getRequest().toRequestFailure(e));
+ }
+ }
@Override
- protected void forwardEntry(final ConnectionEntry entry) {
+ protected void replayEntry(final ConnectionEntry entry, final long now) {
+ try {
+ findCohort(entry).replayEntry(entry, this::replayToSuccessor);
+ } catch (RequestException e) {
+ entry.complete(entry.getRequest().toRequestFailure(e));
+ }
+ }
+
+ private ProxyReconnectCohort findCohort(final ConnectionEntry entry) throws CohortNotFoundException {
final Request<? , ?> request = entry.getRequest();
final LocalHistoryIdentifier historyId;
throw new IllegalArgumentException("Unhandled request " + request);
}
- try {
- final ProxyReconnectCohort cohort = cohorts.get(historyId);
- if (cohort == null) {
- LOG.warn("Cohort for request {} not found, aborting it", request);
- throw FAILED_TO_REPLAY_EXCEPTION;
- }
-
- // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
- // period required to get into the queue.
- cohort.replayRequest(request, entry.getCallback(), this::sendToSuccessor);
- } catch (RequestException e) {
- entry.complete(request.toRequestFailure(e));
+ final ProxyReconnectCohort cohort = cohorts.get(historyId);
+ if (cohort == null) {
+ LOG.warn("Cohort for request {} not found, aborting it", request);
+ throw new CohortNotFoundException(historyId);
}
+
+ return cohort;
}
-}
\ No newline at end of file
+}