+ @Test
+ public void testCompleteOrdering() {
+ final Request<?, ?> req0 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+ final Request<?, ?> req1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
+ final Request<?, ?> req2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 2L, probe.ref());
+ final Request<?, ?> req3 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 3L, probe.ref());
+ final Request<?, ?> req4 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 4L, probe.ref());
+ final Request<?, ?> req5 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 5L, probe.ref());
+ final Request<?, ?> req6 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 6L, probe.ref());
+ final Consumer<Response<?, ?>> callback = createConsumerMock();
+
+ // Fill the queue up to capacity + 1
+ queue.enqueue(new ConnectionEntry(req0, callback, 0), 0);
+ queue.enqueue(new ConnectionEntry(req1, callback, 0), 0);
+ queue.enqueue(new ConnectionEntry(req2, callback, 0), 0);
+ queue.enqueue(new ConnectionEntry(req3, callback, 0), 0);
+ assertEqualRequests(queue.getInflight(), req0, req1, req2);
+ assertEqualRequests(queue.getPending(), req3);
+
+ // Now complete req0, which should transmit req3
+ queue.complete(new FailureEnvelope(req0.toRequestFailure(mock(RequestException.class)), 0, 0, 0), 0);
+ assertEqualRequests(queue.getInflight(), req1, req2, req3);
+ assertEqualRequests(queue.getPending());
+
+ // Now complete req1, which should leave an empty slot
+ queue.complete(new FailureEnvelope(req1.toRequestFailure(mock(RequestException.class)), 0, 1, 0), 0);
+ assertEqualRequests(queue.getInflight(), req2, req3);
+ assertEqualRequests(queue.getPending());
+
+ // Enqueue req4, which should be immediately transmitted
+ queue.enqueue(new ConnectionEntry(req4, callback, 0), 0);
+ assertEqualRequests(queue.getInflight(), req2, req3, req4);
+ assertEqualRequests(queue.getPending());
+
+ // Enqueue req5, which should move to pending
+ queue.enqueue(new ConnectionEntry(req5, callback, 0), 0);
+ assertEqualRequests(queue.getInflight(), req2, req3, req4);
+ assertEqualRequests(queue.getPending(), req5);
+
+ // Remove req4, creating an inconsistency...
+ queue.getInflight().removeLast();
+ assertEqualRequests(queue.getInflight(), req2, req3);
+ assertEqualRequests(queue.getPending(), req5);
+
+ // ... and enqueue req6, which should cause req5 to be transmitted
+ queue.enqueue(new ConnectionEntry(req6, callback, 0), 0);
+ assertEqualRequests(queue.getInflight(), req2, req3, req5);
+ assertEqualRequests(queue.getPending(), req6);
+ }
+
+ private static void assertEqualRequests(final Collection<? extends ConnectionEntry> queue,
+ final Request<?, ?>... requests) {
+ final List<Request<?, ?>> queued = ImmutableList.copyOf(Collections2.transform(queue,
+ ConnectionEntry::getRequest));
+ assertEquals(Arrays.asList(requests), queued);
+ }
+}