- for (Object obj : successfulRequests) {
- if (obj instanceof TransactionRequest) {
- LOG.debug("Forwarding successful request {} to successor {}", obj, successor);
- successor.handleForwardedRemoteRequest((TransactionRequest<?>) obj, response -> { });
- } else {
- Verify.verify(obj instanceof IncrementSequence);
- successor.incrementSequence(((IncrementSequence) obj).getDelta());
+ if (!successfulRequests.isEmpty()) {
+ // We need to find a good timestamp to use for successful requests, as we do not want to time them out
+ // nor create timing inconsistencies in the queue -- requests are expected to be ordered by their enqueue
+ // time. We will pick the time of the first entry available. If there is none, we will just use current
+ // time, as all other requests will get enqueued afterwards.
+ final ConnectionEntry firstInQueue = Iterables.getFirst(enqueuedEntries, null);
+ final long now = firstInQueue != null ? firstInQueue.getEnqueuedTicks() : parent.currentTime();
+
+ for (Object obj : successfulRequests) {
+ if (obj instanceof TransactionRequest) {
+ LOG.debug("Forwarding successful request {} to successor {}", obj, successor);
+ successor.doReplayRequest((TransactionRequest<?>) obj, resp -> { /*NOOP*/ }, now);
+ } else {
+ Verify.verify(obj instanceof IncrementSequence);
+ final IncrementSequence increment = (IncrementSequence) obj;
+ successor.doReplayRequest(new IncrementTransactionSequenceRequest(getIdentifier(),
+ increment.getSequence(), localActor(), isSnapshotOnly(),
+ increment.getDelta()), resp -> { /*NOOP*/ }, now);
+ LOG.debug("Incrementing sequence {} to successor {}", obj, successor);
+ }