+ return previousEntries -> finishReconnect(newConn, stamp, cohorts, previousEntries);
+ }
+
+ private ReconnectForwarder finishReconnect(final ConnectedClientConnection<ShardBackendInfo> newConn,
+ final long stamp, final Collection<HistoryReconnectCohort> cohorts,
+ final Collection<ConnectionEntry> previousEntries) {
+ try {
+ // Step 2: Collect previous successful requests from the cohorts. We do not want to expose
+ // the non-throttling interface to the connection, hence we use a wrapper consumer
+ for (HistoryReconnectCohort c : cohorts) {
+ c.replayRequests(previousEntries);
+ }
+
+ // Step 3: Install a forwarder, which will forward requests back to affected cohorts. Any outstanding
+ // requests will be immediately sent to it and requests being sent concurrently will get
+ // forwarded once they hit the new connection.
+ return BouncingReconnectForwarder.forCohorts(newConn, cohorts);
+ } finally {