X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fcds-access-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Faccess%2Fclient%2FTransmitQueue.java;h=24264eeedff77bca598e58d4d3f4f4a6ac996c34;hp=e1c5589004ec6685b086efacc32972d890516d8b;hb=dafc95d149bc62f101de37e94b9b5e3526d4e87b;hpb=4dc3bb90f1db1c4ee3f87d72734bc3de4d1b801e diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java index e1c5589004..24264eeedf 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java @@ -8,11 +8,13 @@ package org.opendaylight.controller.cluster.access.client; import akka.actor.ActorRef; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Verify; -import com.google.common.collect.Iterables; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Deque; import java.util.Iterator; import java.util.Optional; import java.util.Queue; @@ -52,6 +54,16 @@ import org.slf4j.LoggerFactory; @NotThreadSafe abstract class TransmitQueue { static final class Halted extends TransmitQueue { + // For ConnectingClientConnection. + Halted(final int targetDepth) { + super(targetDepth); + } + + // For ReconnectingClientConnection. + Halted(final TransmitQueue oldQueue, final long now) { + super(oldQueue, now); + } + @Override int canTransmitCount(final int inflightSize) { return 0; @@ -67,7 +79,9 @@ abstract class TransmitQueue { private final BackendInfo backend; private long nextTxSequence; - Transmitting(final BackendInfo backend) { + // For ConnectedClientConnection. + Transmitting(final TransmitQueue oldQueue, final int targetDepth, final BackendInfo backend, final long now) { + super(oldQueue, targetDepth, now); this.backend = Preconditions.checkNotNull(backend); } @@ -90,20 +104,64 @@ abstract class TransmitQueue { private static final Logger LOG = LoggerFactory.getLogger(TransmitQueue.class); - private final ArrayDeque inflight = new ArrayDeque<>(); - private final ArrayDeque pending = new ArrayDeque<>(); - + private final Deque inflight = new ArrayDeque<>(); + private final Deque pending = new ArrayDeque<>(); + private final AveragingProgressTracker tracker; // Cannot be just ProgressTracker as we are inheriting limits. private ReconnectForwarder successor; - final Iterable asIterable() { - return Iterables.concat(inflight, pending); + /** + * Construct initial transmitting queue. + */ + TransmitQueue(final int targetDepth) { + tracker = new AveragingProgressTracker(targetDepth); } - private void recordCompletion(final long now, final long enqueuedTicks, final long transmitTicks, - final long execNanos) { - // TODO: record + /** + * Construct new transmitting queue while inheriting timing data from the previous transmit queue instance. + */ + TransmitQueue(final TransmitQueue oldQueue, final int targetDepth, final long now) { + tracker = new AveragingProgressTracker(oldQueue.tracker, targetDepth, now); + } + + /** + * Construct new transmitting queue while inheriting timing and size data from the previous transmit queue instance. + */ + TransmitQueue(final TransmitQueue oldQueue, final long now) { + tracker = new AveragingProgressTracker(oldQueue.tracker, now); + } + + /** + * Cancel the accumulated sum of delays as we expect the new backend to work now. + */ + void cancelDebt(final long now) { + tracker.cancelDebt(now); } + /** + * Drain the contents of the connection into a list. This will leave the queue empty and allow further entries + * to be added to it during replay. When we set the successor all entries enqueued between when this methods + * returns and the successor is set will be replayed to the successor. + * + * @return Collection of entries present in the queue. + */ + final Collection drain() { + final Collection ret = new ArrayDeque<>(inflight.size() + pending.size()); + ret.addAll(inflight); + ret.addAll(pending); + inflight.clear(); + pending.clear(); + return ret; + } + + final long ticksStalling(final long now) { + return tracker.ticksStalling(now); + } + + final boolean hasSuccessor() { + return successor != null; + } + + // If a matching request was found, this will track a task was closed. final Optional complete(final ResponseEnvelope envelope, final long now) { Optional maybeEntry = findMatchingEntry(inflight, envelope); if (maybeEntry == null) { @@ -117,42 +175,85 @@ abstract class TransmitQueue { } final TransmittedConnectionEntry entry = maybeEntry.get(); - recordCompletion(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos()); + tracker.closeTask(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos()); // We have freed up a slot, try to transmit something - int toSend = canTransmitCount(inflight.size()); - while (toSend > 0) { + tryTransmit(now); + + return Optional.of(entry); + } + + final void tryTransmit(final long now) { + final int toSend = canTransmitCount(inflight.size()); + if (toSend > 0 && !pending.isEmpty()) { + transmitEntries(toSend, now); + } + } + + private void transmitEntries(final int maxTransmit, final long now) { + for (int i = 0; i < maxTransmit; ++i) { final ConnectionEntry e = pending.poll(); if (e == null) { - break; + LOG.debug("Queue {} transmitted {} requests", this, i); + return; } - LOG.debug("Transmitting entry {}", e); - transmit(e, now); - toSend--; + transmitEntry(e, now); } - return Optional.of(entry); + LOG.debug("Queue {} transmitted {} requests", this, maxTransmit); } - final void enqueue(final ConnectionEntry entry, final long now) { + private void transmitEntry(final ConnectionEntry entry, final long now) { + LOG.debug("Queue {} transmitting entry {}", this, entry); + // We are not thread-safe and are supposed to be externally-guarded, + // hence send-before-record should be fine. + // This needs to be revisited if the external guards are lowered. + inflight.addLast(transmit(entry, now)); + } + + /** + * Enqueue an entry, possibly also transmitting it. + * + * @return Delay to be forced on the calling thread, in nanoseconds. + */ + final long enqueue(final ConnectionEntry entry, final long now) { if (successor != null) { + // This call will pay the enqueuing price, hence the caller does not have to successor.forwardEntry(entry, now); - return; + return 0; } - if (canTransmitCount(inflight.size()) <= 0) { + // XXX: we should place a guard against incorrect entry sequences: + // entry.getEnqueueTicks() should have non-negative difference from the last entry present in the queues + + // Reserve an entry before we do anything that can fail + final long delay = tracker.openTask(now); + + /* + * This is defensive to make sure we do not do the wrong thing here and reorder messages if we ever happen + * to have available send slots and non-empty pending queue. + */ + final int toSend = canTransmitCount(inflight.size()); + if (toSend <= 0) { LOG.trace("Queue is at capacity, delayed sending of request {}", entry.getRequest()); - pending.add(entry); - return; + pending.addLast(entry); + return delay; } - // We are not thread-safe and are supposed to be externally-guarded, hence send-before-record should be fine. - // This needs to be revisited if the external guards are lowered. - inflight.offer(transmit(entry, now)); - LOG.debug("Sent request {} on queue {}", entry.getRequest(), this); + if (pending.isEmpty()) { + transmitEntry(entry, now); + return delay; + } + + pending.addLast(entry); + transmitEntries(toSend, now); + return delay; } + /** + * Return the number of entries which can be transmitted assuming the supplied in-flight queue size. + */ abstract int canTransmitCount(int inflightSize); abstract TransmittedConnectionEntry transmit(ConnectionEntry entry, long now); @@ -176,23 +277,53 @@ abstract class TransmitQueue { } final void setForwarder(final ReconnectForwarder forwarder, final long now) { - Verify.verify(successor == null, "Successor {} already set on connection {}", successor, this); + Verify.verify(successor == null, "Successor %s already set on connection %s", successor, this); successor = Preconditions.checkNotNull(forwarder); LOG.debug("Connection {} superseded by {}, splicing queue", this, successor); + /* + * We need to account for entries which have been added between the time drain() was called and this method + * is invoked. Since the old connection is visible during replay and some entries may have completed on the + * replay thread, there was an avenue for this to happen. + */ + int count = 0; ConnectionEntry entry = inflight.poll(); while (entry != null) { - successor.forwardEntry(entry, now); + successor.replayEntry(entry, now); entry = inflight.poll(); + count++; } entry = pending.poll(); while (entry != null) { - successor.forwardEntry(entry, now); + successor.replayEntry(entry, now); entry = pending.poll(); + count++; + } + + LOG.debug("Connection {} queue spliced {} messages", this, count); + } + + final void remove(final long now) { + final TransmittedConnectionEntry txe = inflight.poll(); + if (txe == null) { + final ConnectionEntry entry = pending.pop(); + tracker.closeTask(now, entry.getEnqueuedTicks(), 0, 0); + } else { + tracker.closeTask(now, txe.getEnqueuedTicks(), txe.getTxTicks(), 0); } } + @VisibleForTesting + Deque getInflight() { + return inflight; + } + + @VisibleForTesting + Deque getPending() { + return pending; + } + /* * We are using tri-state return here to indicate one of three conditions: * - if a matching entry is found, return an Optional containing it @@ -255,5 +386,4 @@ abstract class TransmitQueue { } queue.clear(); } - }