- for (Object obj : successfulRequests) {
- if (obj instanceof TransactionRequest) {
- LOG.debug("Forwarding successful request {} to successor {}", obj, successor);
- successor.replay((TransactionRequest<?>) obj, response -> { });
- } else {
- Verify.verify(obj instanceof IncrementSequence);
- successor.incrementSequence(((IncrementSequence) obj).getDelta());
+ if (!successfulRequests.isEmpty()) {
+ // We need to find a good timestamp to use for successful requests, as we do not want to time them out
+ // nor create timing inconsistencies in the queue -- requests are expected to be ordered by their enqueue
+ // time. We will pick the time of the first entry available. If there is none, we will just use current
+ // time, as all other requests will get enqueued afterwards.
+ final ConnectionEntry firstInQueue = Iterables.getFirst(enqueuedEntries, null);
+ final long now = firstInQueue != null ? firstInQueue.getEnqueuedTicks() : parent.currentTime();
+
+ for (Object obj : successfulRequests) {
+ if (obj instanceof TransactionRequest) {
+ LOG.debug("Forwarding successful request {} to successor {}", obj, successor);
+ successor.replayRequest((TransactionRequest<?>) obj, resp -> { }, now);
+ } else {
+ Verify.verify(obj instanceof IncrementSequence);
+ successor.incrementSequence(((IncrementSequence) obj).getDelta());
+ }