There is a bit of confusion between 'replay' and 'forward' methods.
They serve two distinct purposes:
- 'replay' happens during reconnect, i.e. for requests that have
already entered the connection queue and have paid
the delay cost, so they should not pay it again.
- 'forward' happens after reconnect for requests that have raced
with the reconnect process, i.e. they need to hop from
the old connection to the new one. These need to enter
the queue and pay the delay cost.
This patch cleans the codepaths up to use consistent naming, making
it clearer that the problem we are seeing is in the 'replay' path.
Change-Id: Id854e09a0308f8d0a9144d59f41e31950cd58665
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
- void replaySuccessfulRequests(final Iterable<ConnectionEntry> previousEntries) {
- proxy.replaySuccessfulRequests(previousEntries);
+ void replayRequests(final Iterable<ConnectionEntry> previousEntries) {
+ proxy.replayRequests(previousEntries);
// Step 2: Collect previous successful requests from the cohorts. We do not want to expose
// the non-throttling interface to the connection, hence we use a wrapper consumer
for (HistoryReconnectCohort c : cohorts) {
// Step 2: Collect previous successful requests from the cohorts. We do not want to expose
// the non-throttling interface to the connection, hence we use a wrapper consumer
for (HistoryReconnectCohort c : cohorts) {
- c.replaySuccessfulRequests(previousEntries);
+ c.replayRequests(previousEntries);
}
// Step 3: Install a forwarder, which will forward requests back to affected cohorts. Any outstanding
}
// Step 3: Install a forwarder, which will forward requests back to affected cohorts. Any outstanding
* @param request Request to be forwarded
* @param callback Original callback
*/
* @param request Request to be forwarded
* @param callback Original callback
*/
- final void replayRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+ final void forwardRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
final AbstractProxyTransaction successor = getSuccessorState().getSuccessor();
if (successor instanceof LocalProxyTransaction) {
final AbstractProxyTransaction successor = getSuccessorState().getSuccessor();
if (successor instanceof LocalProxyTransaction) {
// FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
// period required to get into the queue.
// FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the
// period required to get into the queue.
- cohort.replayRequest(request, entry.getCallback(), this::sendToSuccessor);
+ cohort.forwardRequest(request, entry.getCallback(), this::sendToSuccessor);
} catch (RequestException e) {
entry.complete(request.toRequestFailure(e));
}
} catch (RequestException e) {
entry.complete(request.toRequestFailure(e));
}
abstract class HistoryReconnectCohort implements AutoCloseable {
abstract ProxyReconnectCohort getProxy();
abstract class HistoryReconnectCohort implements AutoCloseable {
abstract ProxyReconnectCohort getProxy();
- abstract void replaySuccessfulRequests(Iterable<ConnectionEntry> previousEntries);
+ abstract void replayRequests(Iterable<ConnectionEntry> previousEntries);
@Override
public abstract void close();
@Override
public abstract void close();
@GuardedBy("lock")
@Override
@GuardedBy("lock")
@Override
- void replaySuccessfulRequests(final Iterable<ConnectionEntry> previousEntries) {
+ void replayRequests(final Iterable<ConnectionEntry> previousEntries) {
// First look for our Create message
for (ConnectionEntry e : previousEntries) {
final Request<?, ?> req = e.getRequest();
// First look for our Create message
for (ConnectionEntry e : previousEntries) {
final Request<?, ?> req = e.getRequest();
- void replayRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
- final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> replayTo) throws RequestException {
+ void forwardRequest(final Request<?, ?> request, final Consumer<Response<?, ?>> callback,
+ final BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> forwardTo) throws RequestException {
if (request instanceof TransactionRequest) {
if (request instanceof TransactionRequest) {
- replayTransactionRequest((TransactionRequest<?>) request, callback);
+ forwardTransactionRequest((TransactionRequest<?>) request, callback);
} else if (request instanceof LocalHistoryRequest) {
} else if (request instanceof LocalHistoryRequest) {
- replayTo.accept(request, callback);
+ forwardTo.accept(request, callback);
} else {
throw new IllegalArgumentException("Unhandled request " + request);
}
}
} else {
throw new IllegalArgumentException("Unhandled request " + request);
}
}
- private void replayTransactionRequest(final TransactionRequest<?> request,
+ private void forwardTransactionRequest(final TransactionRequest<?> request,
final Consumer<Response<?, ?>> callback) throws RequestException {
final AbstractProxyTransaction proxy;
final Consumer<Response<?, ?>> callback) throws RequestException {
final AbstractProxyTransaction proxy;
throw new RequestReplayException("Failed to find proxy for %s", request);
}
throw new RequestReplayException("Failed to find proxy for %s", request);
}
- proxy.replayRequest(request, callback);
+ proxy.forwardRequest(request, callback);
abstract class ProxyReconnectCohort implements Identifiable<LocalHistoryIdentifier> {
abstract class ProxyReconnectCohort implements Identifiable<LocalHistoryIdentifier> {
- abstract void replaySuccessfulRequests(Iterable<ConnectionEntry> previousEntries);
+ abstract void replayRequests(Iterable<ConnectionEntry> previousEntries);
abstract ProxyHistory finishReconnect();
abstract ProxyHistory finishReconnect();
- abstract void replayRequest(Request<?, ?> request, Consumer<Response<?, ?>> callback,
+ abstract void forwardRequest(Request<?, ?> request, Consumer<Response<?, ?>> callback,
BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> replayTo) throws RequestException;
}
BiConsumer<Request<?, ?>, Consumer<Response<?, ?>>> replayTo) throws RequestException;
}