final long now = Ticker.systemTicker().read();
final int sentMessages = getMaxInFlightMessages() + 1;
for (int i = 0; i < sentMessages; i++) {
- queue.enqueue(new ConnectionEntry(request, callback, now), now);
+ queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
}
final Collection<ConnectionEntry> entries = queue.drain();
Assert.assertEquals(sentMessages, entries.size());
final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, requestSequence, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
final long now = Ticker.systemTicker().read();
- queue.enqueue(new ConnectionEntry(request, callback, now), now);
+ queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
//different transaction id
final TransactionIdentifier anotherTxId = new TransactionIdentifier(HISTORY, 1L);
final RequestSuccess<?, ?> success1 = new TransactionPurgeResponse(anotherTxId, requestSequence);
final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
final long now = Ticker.systemTicker().read();
- queue.enqueue(new ConnectionEntry(request, callback, now), now);
+ queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
Assert.assertFalse(queue.isEmpty());
}
final long now = Ticker.systemTicker().read();
final ConnectionEntry entry1 = new ConnectionEntry(request1, callback, now);
final ConnectionEntry entry2 = new ConnectionEntry(request2, callback, now);
- queue.enqueue(entry1, now);
- queue.enqueue(entry2, now);
+ queue.enqueueOrForward(entry1, now);
+ queue.enqueueOrForward(entry2, now);
Assert.assertEquals(entry1.getRequest(), queue.peek().getRequest());
}
final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
final long now = Ticker.systemTicker().read();
- queue.enqueue(new ConnectionEntry(request, callback, now), now);
+ queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
queue.poison(new RuntimeRequestException("fail", new RuntimeException("fail")));
verify(callback).accept(any(TransactionFailure.class));
Assert.assertTrue(queue.isEmpty());