import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest;
import com.google.common.base.Ticker;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
import com.google.common.testing.FakeTicker;
import java.util.Arrays;
import java.util.Collection;
private BackendInfo backendInfo;
+ private static long now() {
+ return Ticker.systemTicker().read();
+ }
+
@Override
protected int getMaxInFlightMessages() {
return backendInfo.getMaxMessages();
@Override
protected TransmitQueue.Transmitting createQueue() {
backendInfo = new BackendInfo(probe.ref(), 0L, ABIVersion.BORON, 3);
- return new TransmitQueue.Transmitting(0, backendInfo);
+ return new TransmitQueue.Transmitting(new TransmitQueue.Halted(0), 0, backendInfo, now());
}
@Test
final Request<?, ?> request2 = new TransactionPurgeRequest(transactionIdentifier2, sequence2, probe.ref());
final Consumer<Response<?, ?>> callback1 = createConsumerMock();
final Consumer<Response<?, ?>> callback2 = createConsumerMock();
- final long now1 = Ticker.systemTicker().read();
- final long now2 = Ticker.systemTicker().read();
+ final long now1 = now();
+ final long now2 = now();
//enqueue 2 entries
- queue.enqueue(new ConnectionEntry(request1, callback1, now1), now1);
- queue.enqueue(new ConnectionEntry(request2, callback2, now2), now2);
+ queue.enqueueOrForward(new ConnectionEntry(request1, callback1, now1), now1);
+ queue.enqueueOrForward(new ConnectionEntry(request2, callback2, now2), now2);
final RequestSuccess<?, ?> success1 = new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, sequence1);
final RequestSuccess<?, ?> success2 = new TransactionPurgeResponse(transactionIdentifier2, sequence2);
//complete entries in different order
public void testEnqueueCanTransmit() throws Exception {
final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
- final long now = Ticker.systemTicker().read();
- queue.enqueue(new ConnectionEntry(request, callback, now), now);
+ final long now = now();
+ queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
assertEquals(request, requestEnvelope.getMessage());
}
public void testEnqueueBackendFull() throws Exception {
final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
- final long now = Ticker.systemTicker().read();
+ final long now = now();
final int sentMessages = getMaxInFlightMessages() + 1;
for (int i = 0; i < sentMessages; i++) {
- queue.enqueue(new ConnectionEntry(request, callback, now), now);
+ queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
}
for (int i = 0; i < getMaxInFlightMessages(); i++) {
probe.expectMsgClass(RequestEnvelope.class);
}
probe.expectNoMsg();
- final Iterable<ConnectionEntry> entries = queue.asIterable();
- assertEquals(sentMessages, Iterables.size(entries));
+ final Collection<ConnectionEntry> entries = queue.drain();
+ assertEquals(sentMessages, entries.size());
assertThat(entries, everyItem(entryWithRequest(request)));
}
public void testTransmit() throws Exception {
final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
- final long now = Ticker.systemTicker().read();
+ final long now = now();
final ConnectionEntry entry = new ConnectionEntry(request, callback, now);
queue.transmit(entry, now);
final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
final ConnectionEntry entry = new ConnectionEntry(request, callback, ticker.read());
- queue.enqueue(entry, ticker.read());
final ReconnectForwarder forwarder = mock(ReconnectForwarder.class);
- final long setForwarderNow = ticker.read();
- queue.setForwarder(forwarder, setForwarderNow);
- verify(forwarder).forwardEntry(isA(TransmittedConnectionEntry.class), eq(setForwarderNow));
+ queue.setForwarder(forwarder, ticker.read());
final long secondEnqueueNow = ticker.read();
- queue.enqueue(entry, secondEnqueueNow);
+ queue.enqueueOrForward(entry, secondEnqueueNow);
verify(forwarder).forwardEntry(entry, secondEnqueueNow);
}
final Consumer<Response<?, ?>> callback = createConsumerMock();
// Fill the queue up to capacity + 1
- queue.enqueue(new ConnectionEntry(req0, callback, 0), 0);
- queue.enqueue(new ConnectionEntry(req1, callback, 0), 0);
- queue.enqueue(new ConnectionEntry(req2, callback, 0), 0);
- queue.enqueue(new ConnectionEntry(req3, callback, 0), 0);
+ queue.enqueueOrForward(new ConnectionEntry(req0, callback, 0), 0);
+ queue.enqueueOrForward(new ConnectionEntry(req1, callback, 0), 0);
+ queue.enqueueOrForward(new ConnectionEntry(req2, callback, 0), 0);
+ queue.enqueueOrForward(new ConnectionEntry(req3, callback, 0), 0);
assertEqualRequests(queue.getInflight(), req0, req1, req2);
assertEqualRequests(queue.getPending(), req3);
assertEqualRequests(queue.getPending());
// Enqueue req4, which should be immediately transmitted
- queue.enqueue(new ConnectionEntry(req4, callback, 0), 0);
+ queue.enqueueOrForward(new ConnectionEntry(req4, callback, 0), 0);
assertEqualRequests(queue.getInflight(), req2, req3, req4);
assertEqualRequests(queue.getPending());
// Enqueue req5, which should move to pending
- queue.enqueue(new ConnectionEntry(req5, callback, 0), 0);
+ queue.enqueueOrForward(new ConnectionEntry(req5, callback, 0), 0);
assertEqualRequests(queue.getInflight(), req2, req3, req4);
assertEqualRequests(queue.getPending(), req5);
assertEqualRequests(queue.getPending(), req5);
// ... and enqueue req6, which should cause req5 to be transmitted
- queue.enqueue(new ConnectionEntry(req6, callback, 0), 0);
+ queue.enqueueOrForward(new ConnectionEntry(req6, callback, 0), 0);
assertEqualRequests(queue.getInflight(), req2, req3, req5);
assertEqualRequests(queue.getPending(), req6);
}