final long now1 = now();
final long now2 = now();
//enqueue 2 entries
- queue.enqueue(new ConnectionEntry(request1, callback1, now1), now1);
- queue.enqueue(new ConnectionEntry(request2, callback2, now2), now2);
+ queue.enqueueOrForward(new ConnectionEntry(request1, callback1, now1), now1);
+ queue.enqueueOrForward(new ConnectionEntry(request2, callback2, now2), now2);
final RequestSuccess<?, ?> success1 = new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, sequence1);
final RequestSuccess<?, ?> success2 = new TransactionPurgeResponse(transactionIdentifier2, sequence2);
//complete entries in different order
final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
final long now = now();
- queue.enqueue(new ConnectionEntry(request, callback, now), now);
+ queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
assertEquals(request, requestEnvelope.getMessage());
}
final long now = now();
final int sentMessages = getMaxInFlightMessages() + 1;
for (int i = 0; i < sentMessages; i++) {
- queue.enqueue(new ConnectionEntry(request, callback, now), now);
+ queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
}
for (int i = 0; i < getMaxInFlightMessages(); i++) {
probe.expectMsgClass(RequestEnvelope.class);
final ReconnectForwarder forwarder = mock(ReconnectForwarder.class);
queue.setForwarder(forwarder, ticker.read());
final long secondEnqueueNow = ticker.read();
- queue.enqueue(entry, secondEnqueueNow);
+ queue.enqueueOrForward(entry, secondEnqueueNow);
verify(forwarder).forwardEntry(entry, secondEnqueueNow);
}
final Consumer<Response<?, ?>> callback = createConsumerMock();
// Fill the queue up to capacity + 1
- queue.enqueue(new ConnectionEntry(req0, callback, 0), 0);
- queue.enqueue(new ConnectionEntry(req1, callback, 0), 0);
- queue.enqueue(new ConnectionEntry(req2, callback, 0), 0);
- queue.enqueue(new ConnectionEntry(req3, callback, 0), 0);
+ queue.enqueueOrForward(new ConnectionEntry(req0, callback, 0), 0);
+ queue.enqueueOrForward(new ConnectionEntry(req1, callback, 0), 0);
+ queue.enqueueOrForward(new ConnectionEntry(req2, callback, 0), 0);
+ queue.enqueueOrForward(new ConnectionEntry(req3, callback, 0), 0);
assertEqualRequests(queue.getInflight(), req0, req1, req2);
assertEqualRequests(queue.getPending(), req3);
assertEqualRequests(queue.getPending());
// Enqueue req4, which should be immediately transmitted
- queue.enqueue(new ConnectionEntry(req4, callback, 0), 0);
+ queue.enqueueOrForward(new ConnectionEntry(req4, callback, 0), 0);
assertEqualRequests(queue.getInflight(), req2, req3, req4);
assertEqualRequests(queue.getPending());
// Enqueue req5, which should move to pending
- queue.enqueue(new ConnectionEntry(req5, callback, 0), 0);
+ queue.enqueueOrForward(new ConnectionEntry(req5, callback, 0), 0);
assertEqualRequests(queue.getInflight(), req2, req3, req4);
assertEqualRequests(queue.getPending(), req5);
assertEqualRequests(queue.getPending(), req5);
// ... and enqueue req6, which should cause req5 to be transmitted
- queue.enqueue(new ConnectionEntry(req6, callback, 0), 0);
+ queue.enqueueOrForward(new ConnectionEntry(req6, callback, 0), 0);
assertEqualRequests(queue.getInflight(), req2, req3, req5);
assertEqualRequests(queue.getPending(), req6);
}