X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fcds-access-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Faccess%2Fclient%2FTransmitQueue.java;h=15ad958304f9b61cd1a20b322b12c7354542759e;hb=fe69101801085580f2fe72762abea5c5fa83d978;hp=9ab80d0d0085df1ef612e48606079b4eae413ea0;hpb=17e4759c7561e09786a22210e43b5b32db45149e;p=controller.git diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java index 9ab80d0d00..15ad958304 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java @@ -8,11 +8,13 @@ package org.opendaylight.controller.cluster.access.client; import akka.actor.ActorRef; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Verify; import com.google.common.collect.Iterables; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.ArrayDeque; +import java.util.Deque; import java.util.Iterator; import java.util.Optional; import java.util.Queue; @@ -95,8 +97,8 @@ abstract class TransmitQueue { private static final Logger LOG = LoggerFactory.getLogger(TransmitQueue.class); - private final ArrayDeque inflight = new ArrayDeque<>(); - private final ArrayDeque pending = new ArrayDeque<>(); + private final Deque inflight = new ArrayDeque<>(); + private final Deque pending = new ArrayDeque<>(); private final ProgressTracker tracker; private ReconnectForwarder successor; @@ -133,19 +135,34 @@ abstract class TransmitQueue { tracker.closeTask(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos()); // We have freed up a slot, try to transmit something - int toSend = canTransmitCount(inflight.size()); - while (toSend > 0) { + final int toSend = canTransmitCount(inflight.size()); + if (toSend > 0 && !pending.isEmpty()) { + transmitEntries(toSend, now); + } + + return Optional.of(entry); + } + + private void transmitEntries(final int maxTransmit, final long now) { + for (int i = 0; i < maxTransmit; ++i) { final ConnectionEntry e = pending.poll(); if (e == null) { - break; + LOG.debug("Queue {} transmitted {} requests", this, i); + return; } - LOG.debug("Transmitting entry {}", e); - transmit(e, now); - toSend--; + transmitEntry(e, now); } - return Optional.of(entry); + LOG.debug("Queue {} transmitted {} requests", this, maxTransmit); + } + + private void transmitEntry(final ConnectionEntry entry, final long now) { + LOG.debug("Queue {} transmitting entry {}", entry); + // We are not thread-safe and are supposed to be externally-guarded, + // hence send-before-record should be fine. + // This needs to be revisited if the external guards are lowered. + inflight.addLast(transmit(entry, now)); } /** @@ -164,16 +181,25 @@ abstract class TransmitQueue { // Reserve an entry before we do anything that can fail final long delay = tracker.openTask(now); - if (canTransmitCount(inflight.size()) <= 0) { + + /* + * This is defensive to make sure we do not do the wrong thing here and reorder messages if we ever happen + * to have available send slots and non-empty pending queue. + */ + final int toSend = canTransmitCount(inflight.size()); + if (toSend <= 0) { LOG.trace("Queue is at capacity, delayed sending of request {}", entry.getRequest()); - pending.add(entry); - } else { - // We are not thread-safe and are supposed to be externally-guarded, - // hence send-before-record should be fine. - // This needs to be revisited if the external guards are lowered. - inflight.offer(transmit(entry, now)); - LOG.debug("Sent request {} on queue {}", entry.getRequest(), this); + pending.addLast(entry); + return delay; + } + + if (pending.isEmpty()) { + transmitEntry(entry, now); + return delay; } + + pending.addLast(entry); + transmitEntries(toSend, now); return delay; } @@ -220,6 +246,16 @@ abstract class TransmitQueue { } } + @VisibleForTesting + Deque getInflight() { + return inflight; + } + + @VisibleForTesting + Deque getPending() { + return pending; + } + /* * We are using tri-state return here to indicate one of three conditions: * - if a matching entry is found, return an Optional containing it