enqueueEntry(new ConnectionEntry(request, callback, enqueuedTicks), currentTime());
}
- public final long enqueueEntry(final ConnectionEntry entry, final long now) {
+ private long enqueueOrForward(final ConnectionEntry entry, final long now) {
lock.lock();
try {
- final RequestException maybePoison = poisoned;
- if (maybePoison != null) {
- throw new IllegalStateException("Connection " + this + " has been poisoned", maybePoison);
- }
+ commonEnqueue(entry, now);
+ return queue.enqueueOrForward(entry, now);
+ } finally {
+ lock.unlock();
+ }
+ }
- if (queue.isEmpty()) {
- // The queue is becoming non-empty, schedule a timer.
- scheduleTimer(entry.getEnqueuedTicks() + context.config().getRequestTimeout() - now);
- }
- return queue.enqueue(entry, now);
+ /**
+ * Enqueue an entry, possibly also transmitting it.
+ */
+ public final void enqueueEntry(final ConnectionEntry entry, final long now) {
+ lock.lock();
+ try {
+ commonEnqueue(entry, now);
+ queue.enqueueOrReplay(entry, now);
} finally {
lock.unlock();
}
}
+ @GuardedBy("lock")
+ private void commonEnqueue(final ConnectionEntry entry, final long now) {
+ final RequestException maybePoison = poisoned;
+ if (maybePoison != null) {
+ throw new IllegalStateException("Connection " + this + " has been poisoned", maybePoison);
+ }
+
+ if (queue.isEmpty()) {
+ // The queue is becoming non-empty, schedule a timer.
+ scheduleTimer(entry.getEnqueuedTicks() + context.config().getRequestTimeout() - now);
+ }
+ }
+
// To be called from ClientActorBehavior on ConnectedClientConnection after entries are replayed.
final void cancelDebt() {
queue.cancelDebt(currentTime());
RequestException runtimeRequestException);
final void sendEntry(final ConnectionEntry entry, final long now) {
- long delay = enqueueEntry(entry, now);
+ long delay = enqueueOrForward(entry, now);
try {
if (delay >= DEBUG_DELAY_NANOS) {
if (delay > MAX_DELAY_NANOS) {
inflight.addLast(transmit(entry, now));
}
- /**
- * Enqueue an entry, possibly also transmitting it.
- *
- * @return Delay to be forced on the calling thread, in nanoseconds.
- */
- final long enqueue(final ConnectionEntry entry, final long now) {
+ final long enqueueOrForward(final ConnectionEntry entry, final long now) {
if (successor != null) {
// This call will pay the enqueuing price, hence the caller does not have to
successor.forwardEntry(entry, now);
return 0;
}
+ return enqueue(entry, now);
+ }
+
+ final void enqueueOrReplay(final ConnectionEntry entry, final long now) {
+ if (successor != null) {
+ successor.replayEntry(entry, now);
+ } else {
+ enqueue(entry, now);
+ }
+ }
+
+ /**
+ * Enqueue an entry, possibly also transmitting it.
+ *
+ * @return Delay to be forced on the calling thread, in nanoseconds.
+ */
+ private long enqueue(final ConnectionEntry entry, final long now) {
+
// XXX: we should place a guard against incorrect entry sequences:
// entry.getEnqueueTicks() should have non-negative difference from the last entry present in the queues
final long now = Ticker.systemTicker().read();
final int sentMessages = getMaxInFlightMessages() + 1;
for (int i = 0; i < sentMessages; i++) {
- queue.enqueue(new ConnectionEntry(request, callback, now), now);
+ queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
}
final Collection<ConnectionEntry> entries = queue.drain();
Assert.assertEquals(sentMessages, entries.size());
final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, requestSequence, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
final long now = Ticker.systemTicker().read();
- queue.enqueue(new ConnectionEntry(request, callback, now), now);
+ queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
//different transaction id
final TransactionIdentifier anotherTxId = new TransactionIdentifier(HISTORY, 1L);
final RequestSuccess<?, ?> success1 = new TransactionPurgeResponse(anotherTxId, requestSequence);
final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
final long now = Ticker.systemTicker().read();
- queue.enqueue(new ConnectionEntry(request, callback, now), now);
+ queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
Assert.assertFalse(queue.isEmpty());
}
final long now = Ticker.systemTicker().read();
final ConnectionEntry entry1 = new ConnectionEntry(request1, callback, now);
final ConnectionEntry entry2 = new ConnectionEntry(request2, callback, now);
- queue.enqueue(entry1, now);
- queue.enqueue(entry2, now);
+ queue.enqueueOrForward(entry1, now);
+ queue.enqueueOrForward(entry2, now);
Assert.assertEquals(entry1.getRequest(), queue.peek().getRequest());
}
final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
final long now = Ticker.systemTicker().read();
- queue.enqueue(new ConnectionEntry(request, callback, now), now);
+ queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
queue.poison(new RuntimeRequestException("fail", new RuntimeException("fail")));
verify(callback).accept(any(TransactionFailure.class));
Assert.assertTrue(queue.isEmpty());
final long now1 = now();
final long now2 = now();
//enqueue 2 entries
- queue.enqueue(new ConnectionEntry(request1, callback1, now1), now1);
- queue.enqueue(new ConnectionEntry(request2, callback2, now2), now2);
+ queue.enqueueOrForward(new ConnectionEntry(request1, callback1, now1), now1);
+ queue.enqueueOrForward(new ConnectionEntry(request2, callback2, now2), now2);
final RequestSuccess<?, ?> success1 = new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, sequence1);
final RequestSuccess<?, ?> success2 = new TransactionPurgeResponse(transactionIdentifier2, sequence2);
//complete entries in different order
final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
final long now = now();
- queue.enqueue(new ConnectionEntry(request, callback, now), now);
+ queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
assertEquals(request, requestEnvelope.getMessage());
}
final long now = now();
final int sentMessages = getMaxInFlightMessages() + 1;
for (int i = 0; i < sentMessages; i++) {
- queue.enqueue(new ConnectionEntry(request, callback, now), now);
+ queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
}
for (int i = 0; i < getMaxInFlightMessages(); i++) {
probe.expectMsgClass(RequestEnvelope.class);
final ReconnectForwarder forwarder = mock(ReconnectForwarder.class);
queue.setForwarder(forwarder, ticker.read());
final long secondEnqueueNow = ticker.read();
- queue.enqueue(entry, secondEnqueueNow);
+ queue.enqueueOrForward(entry, secondEnqueueNow);
verify(forwarder).forwardEntry(entry, secondEnqueueNow);
}
final Consumer<Response<?, ?>> callback = createConsumerMock();
// Fill the queue up to capacity + 1
- queue.enqueue(new ConnectionEntry(req0, callback, 0), 0);
- queue.enqueue(new ConnectionEntry(req1, callback, 0), 0);
- queue.enqueue(new ConnectionEntry(req2, callback, 0), 0);
- queue.enqueue(new ConnectionEntry(req3, callback, 0), 0);
+ queue.enqueueOrForward(new ConnectionEntry(req0, callback, 0), 0);
+ queue.enqueueOrForward(new ConnectionEntry(req1, callback, 0), 0);
+ queue.enqueueOrForward(new ConnectionEntry(req2, callback, 0), 0);
+ queue.enqueueOrForward(new ConnectionEntry(req3, callback, 0), 0);
assertEqualRequests(queue.getInflight(), req0, req1, req2);
assertEqualRequests(queue.getPending(), req3);
assertEqualRequests(queue.getPending());
// Enqueue req4, which should be immediately transmitted
- queue.enqueue(new ConnectionEntry(req4, callback, 0), 0);
+ queue.enqueueOrForward(new ConnectionEntry(req4, callback, 0), 0);
assertEqualRequests(queue.getInflight(), req2, req3, req4);
assertEqualRequests(queue.getPending());
// Enqueue req5, which should move to pending
- queue.enqueue(new ConnectionEntry(req5, callback, 0), 0);
+ queue.enqueueOrForward(new ConnectionEntry(req5, callback, 0), 0);
assertEqualRequests(queue.getInflight(), req2, req3, req4);
assertEqualRequests(queue.getPending(), req5);
assertEqualRequests(queue.getPending(), req5);
// ... and enqueue req6, which should cause req5 to be transmitted
- queue.enqueue(new ConnectionEntry(req6, callback, 0), 0);
+ queue.enqueueOrForward(new ConnectionEntry(req6, callback, 0), 0);
assertEqualRequests(queue.getInflight(), req2, req3, req5);
assertEqualRequests(queue.getPending(), req6);
}