package org.opendaylight.controller.cluster.access.client;
import akka.actor.ActorRef;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.Iterables;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayDeque;
+import java.util.Deque;
import java.util.Iterator;
import java.util.Optional;
import java.util.Queue;
private static final Logger LOG = LoggerFactory.getLogger(TransmitQueue.class);
- private final ArrayDeque<TransmittedConnectionEntry> inflight = new ArrayDeque<>();
- private final ArrayDeque<ConnectionEntry> pending = new ArrayDeque<>();
+ private final Deque<TransmittedConnectionEntry> inflight = new ArrayDeque<>();
+ private final Deque<ConnectionEntry> pending = new ArrayDeque<>();
private final ProgressTracker tracker;
private ReconnectForwarder successor;
tracker.closeTask(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
// We have freed up a slot, try to transmit something
- int toSend = canTransmitCount(inflight.size());
- while (toSend > 0) {
+ final int toSend = canTransmitCount(inflight.size());
+ if (toSend > 0 && !pending.isEmpty()) {
+ transmitEntries(toSend, now);
+ }
+
+ return Optional.of(entry);
+ }
+
+ private void transmitEntries(final int maxTransmit, final long now) {
+ for (int i = 0; i < maxTransmit; ++i) {
final ConnectionEntry e = pending.poll();
if (e == null) {
- break;
+ LOG.debug("Queue {} transmitted {} requests", this, i);
+ return;
}
- LOG.debug("Transmitting entry {}", e);
- transmit(e, now);
- toSend--;
+ transmitEntry(e, now);
}
- return Optional.of(entry);
+ LOG.debug("Queue {} transmitted {} requests", this, maxTransmit);
+ }
+
+ private void transmitEntry(final ConnectionEntry entry, final long now) {
+ LOG.debug("Queue {} transmitting entry {}", entry);
+ // We are not thread-safe and are supposed to be externally-guarded,
+ // hence send-before-record should be fine.
+ // This needs to be revisited if the external guards are lowered.
+ inflight.addLast(transmit(entry, now));
}
/**
// Reserve an entry before we do anything that can fail
final long delay = tracker.openTask(now);
- if (canTransmitCount(inflight.size()) <= 0) {
+
+ /*
+ * This is defensive to make sure we do not do the wrong thing here and reorder messages if we ever happen
+ * to have available send slots and non-empty pending queue.
+ */
+ final int toSend = canTransmitCount(inflight.size());
+ if (toSend <= 0) {
LOG.trace("Queue is at capacity, delayed sending of request {}", entry.getRequest());
- pending.add(entry);
- } else {
- // We are not thread-safe and are supposed to be externally-guarded,
- // hence send-before-record should be fine.
- // This needs to be revisited if the external guards are lowered.
- inflight.offer(transmit(entry, now));
- LOG.debug("Sent request {} on queue {}", entry.getRequest(), this);
+ pending.addLast(entry);
+ return delay;
+ }
+
+ if (pending.isEmpty()) {
+ transmitEntry(entry, now);
+ return delay;
}
+
+ pending.addLast(entry);
+ transmitEntries(toSend, now);
return delay;
}
}
}
+ @VisibleForTesting
+ Deque<TransmittedConnectionEntry> getInflight() {
+ return inflight;
+ }
+
+ @VisibleForTesting
+ Deque<ConnectionEntry> getPending() {
+ return pending;
+ }
+
/*
* We are using tri-state return here to indicate one of three conditions:
* - if a matching entry is found, return an Optional containing it
package org.opendaylight.controller.cluster.access.client;
import static org.hamcrest.CoreMatchers.everyItem;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest;
import com.google.common.base.Ticker;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.testing.FakeTicker;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
-import org.junit.Assert;
import org.junit.Test;
import org.opendaylight.controller.cluster.access.ABIVersion;
import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
+import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
import org.opendaylight.controller.cluster.access.concepts.Request;
import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
public void testComplete() throws Exception {
final long sequence1 = 0L;
final long sequence2 = 1L;
- final Request request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, sequence1, probe.ref());
+ final Request<?, ?> request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, sequence1, probe.ref());
final TransactionIdentifier transactionIdentifier2 = new TransactionIdentifier(HISTORY, 1L);
- final Request request2 = new TransactionPurgeRequest(transactionIdentifier2, sequence2, probe.ref());
+ final Request<?, ?> request2 = new TransactionPurgeRequest(transactionIdentifier2, sequence2, probe.ref());
final Consumer<Response<?, ?>> callback1 = createConsumerMock();
final Consumer<Response<?, ?>> callback2 = createConsumerMock();
final long now1 = Ticker.systemTicker().read();
queue.complete(new SuccessEnvelope(success1, 0L, sequence1, 1L), now1);
//check first entry
final TransmittedConnectionEntry transmittedEntry1 = completed1.orElseThrow(AssertionError::new);
- Assert.assertEquals(transmittedEntry1.getRequest(), request1);
- Assert.assertEquals(transmittedEntry1.getTxSequence(), sequence1);
- Assert.assertEquals(transmittedEntry1.getCallback(), callback1);
+ assertEquals(transmittedEntry1.getRequest(), request1);
+ assertEquals(transmittedEntry1.getTxSequence(), sequence1);
+ assertEquals(transmittedEntry1.getCallback(), callback1);
//check second entry
final TransmittedConnectionEntry transmittedEntry2 = completed2.orElseThrow(AssertionError::new);
- Assert.assertEquals(transmittedEntry2.getRequest(), request2);
- Assert.assertEquals(transmittedEntry2.getTxSequence(), sequence2);
- Assert.assertEquals(transmittedEntry2.getCallback(), callback2);
+ assertEquals(transmittedEntry2.getRequest(), request2);
+ assertEquals(transmittedEntry2.getTxSequence(), sequence2);
+ assertEquals(transmittedEntry2.getCallback(), callback2);
}
@Test
public void testEnqueueCanTransmit() throws Exception {
- final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+ final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
final long now = Ticker.systemTicker().read();
queue.enqueue(new ConnectionEntry(request, callback, now), now);
final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
- Assert.assertEquals(request, requestEnvelope.getMessage());
+ assertEquals(request, requestEnvelope.getMessage());
}
@Test
public void testEnqueueBackendFull() throws Exception {
- final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+ final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
final long now = Ticker.systemTicker().read();
final int sentMessages = getMaxInFlightMessages() + 1;
}
probe.expectNoMsg();
final Iterable<ConnectionEntry> entries = queue.asIterable();
- Assert.assertEquals(sentMessages, Iterables.size(entries));
- Assert.assertThat(entries, everyItem(entryWithRequest(request)));
+ assertEquals(sentMessages, Iterables.size(entries));
+ assertThat(entries, everyItem(entryWithRequest(request)));
}
@Test
@Override
public void testCanTransmitCount() throws Exception {
- Assert.assertTrue(queue.canTransmitCount(getMaxInFlightMessages() - 1) > 0);
- Assert.assertFalse(queue.canTransmitCount(getMaxInFlightMessages()) > 0);
+ assertTrue(queue.canTransmitCount(getMaxInFlightMessages() - 1) > 0);
+ assertFalse(queue.canTransmitCount(getMaxInFlightMessages()) > 0);
}
@Test
@Override
public void testTransmit() throws Exception {
- final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+ final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
final long now = Ticker.systemTicker().read();
final ConnectionEntry entry = new ConnectionEntry(request, callback, now);
queue.transmit(entry, now);
final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
- Assert.assertEquals(request, requestEnvelope.getMessage());
+ assertEquals(request, requestEnvelope.getMessage());
}
@Test
public void testSetForwarder() throws Exception {
final FakeTicker ticker = new FakeTicker();
ticker.setAutoIncrementStep(1, TimeUnit.MICROSECONDS);
- final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+ final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
final ConnectionEntry entry = new ConnectionEntry(request, callback, ticker.read());
queue.enqueue(entry, ticker.read());
verify(forwarder).forwardEntry(entry, secondEnqueueNow);
}
-}
\ No newline at end of file
+ @Test
+ public void testCompleteOrdering() {
+ final Request<?, ?> req0 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
+ final Request<?, ?> req1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
+ final Request<?, ?> req2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 2L, probe.ref());
+ final Request<?, ?> req3 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 3L, probe.ref());
+ final Request<?, ?> req4 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 4L, probe.ref());
+ final Request<?, ?> req5 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 5L, probe.ref());
+ final Request<?, ?> req6 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 6L, probe.ref());
+ final Consumer<Response<?, ?>> callback = createConsumerMock();
+
+ // Fill the queue up to capacity + 1
+ queue.enqueue(new ConnectionEntry(req0, callback, 0), 0);
+ queue.enqueue(new ConnectionEntry(req1, callback, 0), 0);
+ queue.enqueue(new ConnectionEntry(req2, callback, 0), 0);
+ queue.enqueue(new ConnectionEntry(req3, callback, 0), 0);
+ assertEqualRequests(queue.getInflight(), req0, req1, req2);
+ assertEqualRequests(queue.getPending(), req3);
+
+ // Now complete req0, which should transmit req3
+ queue.complete(new FailureEnvelope(req0.toRequestFailure(mock(RequestException.class)), 0, 0, 0), 0);
+ assertEqualRequests(queue.getInflight(), req1, req2, req3);
+ assertEqualRequests(queue.getPending());
+
+ // Now complete req1, which should leave an empty slot
+ queue.complete(new FailureEnvelope(req1.toRequestFailure(mock(RequestException.class)), 0, 1, 0), 0);
+ assertEqualRequests(queue.getInflight(), req2, req3);
+ assertEqualRequests(queue.getPending());
+
+ // Enqueue req4, which should be immediately transmitted
+ queue.enqueue(new ConnectionEntry(req4, callback, 0), 0);
+ assertEqualRequests(queue.getInflight(), req2, req3, req4);
+ assertEqualRequests(queue.getPending());
+
+ // Enqueue req5, which should move to pending
+ queue.enqueue(new ConnectionEntry(req5, callback, 0), 0);
+ assertEqualRequests(queue.getInflight(), req2, req3, req4);
+ assertEqualRequests(queue.getPending(), req5);
+
+ // Remove req4, creating an inconsistency...
+ queue.getInflight().removeLast();
+ assertEqualRequests(queue.getInflight(), req2, req3);
+ assertEqualRequests(queue.getPending(), req5);
+
+ // ... and enqueue req6, which should cause req5 to be transmitted
+ queue.enqueue(new ConnectionEntry(req6, callback, 0), 0);
+ assertEqualRequests(queue.getInflight(), req2, req3, req5);
+ assertEqualRequests(queue.getPending(), req6);
+ }
+
+ private static void assertEqualRequests(final Collection<? extends ConnectionEntry> queue,
+ final Request<?, ?>... requests) {
+ final List<Request<?, ?>> queued = ImmutableList.copyOf(Collections2.transform(queue,
+ ConnectionEntry::getRequest));
+ assertEquals(Arrays.asList(requests), queued);
+ }
+}