import akka.testkit.JavaTestKit;
import akka.testkit.TestProbe;
import com.google.common.base.Ticker;
-import com.google.common.collect.Iterables;
+import java.util.Collection;
import java.util.Optional;
import java.util.function.Consumer;
import org.junit.After;
@Test
public void testAsIterable() throws Exception {
- final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+ final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
final long now = Ticker.systemTicker().read();
final int sentMessages = getMaxInFlightMessages() + 1;
for (int i = 0; i < sentMessages; i++) {
queue.enqueue(new ConnectionEntry(request, callback, now), now);
}
- final Iterable<ConnectionEntry> entries = queue.asIterable();
- Assert.assertEquals(sentMessages, Iterables.size(entries));
+ final Collection<ConnectionEntry> entries = queue.drain();
+ Assert.assertEquals(sentMessages, entries.size());
Assert.assertThat(entries, everyItem(entryWithRequest(request)));
}
final long requestSequence = 0L;
final long txSequence = 0L;
final long sessionId = 0L;
- final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, requestSequence, probe.ref());
+ final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, requestSequence, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
final long now = Ticker.systemTicker().read();
queue.enqueue(new ConnectionEntry(request, callback, now), now);
@Test
public void testIsEmpty() throws Exception {
Assert.assertTrue(queue.isEmpty());
- final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+ final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
final long now = Ticker.systemTicker().read();
queue.enqueue(new ConnectionEntry(request, callback, now), now);
@Test
public void testPeek() throws Exception {
- final Request request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
- final Request request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
+ final Request<?, ?> request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+ final Request<?, ?> request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
final long now = Ticker.systemTicker().read();
final ConnectionEntry entry1 = new ConnectionEntry(request1, callback, now);
@Test
public void testPoison() throws Exception {
- final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+ final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
final long now = Ticker.systemTicker().read();
queue.enqueue(new ConnectionEntry(request, callback, now), now);