X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fcds-access-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Faccess%2Fclient%2FTransmitQueue.java;h=e4ee78c5391754c7020b1a385311c077f74c9479;hp=b2497fc7e71798afcde0b7bec2ca624e7c99df49;hb=e983d61d93fe2da50f9c4112fa28c7fe4ee5ffef;hpb=81eb5a1b66be757a09210c2830c9a1895db4f5b5 diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java index b2497fc7e7..e4ee78c539 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java @@ -11,19 +11,23 @@ import akka.actor.ActorRef; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Verify; -import com.google.common.collect.Iterables; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; import java.util.Deque; import java.util.Iterator; +import java.util.List; import java.util.Optional; import java.util.Queue; -import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.concepts.Request; import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; -import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope; +import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException; +import org.opendaylight.controller.cluster.access.concepts.SliceableMessage; +import org.opendaylight.controller.cluster.messaging.MessageSlicer; +import org.opendaylight.controller.cluster.messaging.SliceOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,31 +55,47 @@ import org.slf4j.LoggerFactory; * * @author Robert Varga */ -@NotThreadSafe abstract class TransmitQueue { static final class Halted extends TransmitQueue { + // For ConnectingClientConnection. Halted(final int targetDepth) { super(targetDepth); } + // For ReconnectingClientConnection. + Halted(final TransmitQueue oldQueue, final long now) { + super(oldQueue, now); + } + @Override int canTransmitCount(final int inflightSize) { return 0; } @Override - TransmittedConnectionEntry transmit(final ConnectionEntry entry, final long now) { + Optional transmit(final ConnectionEntry entry, final long now) { throw new UnsupportedOperationException("Attempted to transmit on a halted queue"); } + + @Override + void preComplete(final ResponseEnvelope envelope) { + } } static final class Transmitting extends TransmitQueue { + private static final long NOT_SLICING = -1; + private final BackendInfo backend; + private final MessageSlicer messageSlicer; private long nextTxSequence; + private long currentSlicedEnvSequenceId = NOT_SLICING; - Transmitting(final int targetDepth, final BackendInfo backend) { - super(targetDepth); + // For ConnectedClientConnection. + Transmitting(final TransmitQueue oldQueue, final int targetDepth, final BackendInfo backend, final long now, + final MessageSlicer messageSlicer) { + super(oldQueue, targetDepth, now); this.backend = Preconditions.checkNotNull(backend); + this.messageSlicer = Preconditions.checkNotNull(messageSlicer); } @Override @@ -84,14 +104,42 @@ abstract class TransmitQueue { } @Override - TransmittedConnectionEntry transmit(final ConnectionEntry entry, final long now) { - final RequestEnvelope env = new RequestEnvelope(entry.getRequest().toVersion(backend.getVersion()), + Optional transmit(final ConnectionEntry entry, final long now) { + // If we're currently slicing a message we can't send any subsequent requests until slicing completes to + // avoid an out-of-sequence request envelope failure on the backend. In this case we return an empty + // Optional to indicate the request was not transmitted. + if (currentSlicedEnvSequenceId >= 0) { + return Optional.empty(); + } + + final Request request = entry.getRequest(); + final RequestEnvelope env = new RequestEnvelope(request.toVersion(backend.getVersion()), backend.getSessionId(), nextTxSequence++); - final TransmittedConnectionEntry ret = new TransmittedConnectionEntry(entry, env.getSessionId(), - env.getTxSequence(), now); - backend.getActor().tell(env, ActorRef.noSender()); - return ret; + if (request instanceof SliceableMessage) { + if (messageSlicer.slice(SliceOptions.builder().identifier(request.getTarget()) + .message(env).replyTo(request.getReplyTo()).sendTo(backend.getActor()) + .onFailureCallback(t -> env.sendFailure(new RuntimeRequestException( + "Failed to slice request " + request, t), 0L)).build())) { + // The request was sliced so record the envelope sequence id to prevent transmitting + // subsequent requests until slicing completes. + currentSlicedEnvSequenceId = env.getTxSequence(); + } + } else { + backend.getActor().tell(env, ActorRef.noSender()); + } + + return Optional.of(new TransmittedConnectionEntry(entry, env.getSessionId(), + env.getTxSequence(), now)); + } + + @Override + void preComplete(final ResponseEnvelope envelope) { + if (envelope.getTxSequence() == currentSlicedEnvSequenceId) { + // Slicing completed for the prior request - clear the cached sequence id field to enable subsequent + // requests to be transmitted. + currentSlicedEnvSequenceId = NOT_SLICING; + } } } @@ -99,15 +147,51 @@ abstract class TransmitQueue { private final Deque inflight = new ArrayDeque<>(); private final Deque pending = new ArrayDeque<>(); - private final ProgressTracker tracker; + private final AveragingProgressTracker tracker; // Cannot be just ProgressTracker as we are inheriting limits. private ReconnectForwarder successor; + /** + * Construct initial transmitting queue. + */ TransmitQueue(final int targetDepth) { tracker = new AveragingProgressTracker(targetDepth); } - final Iterable asIterable() { - return Iterables.concat(inflight, pending); + /** + * Construct new transmitting queue while inheriting timing data from the previous transmit queue instance. + */ + TransmitQueue(final TransmitQueue oldQueue, final int targetDepth, final long now) { + tracker = new AveragingProgressTracker(oldQueue.tracker, targetDepth, now); + } + + /** + * Construct new transmitting queue while inheriting timing and size data from the previous transmit queue instance. + */ + TransmitQueue(final TransmitQueue oldQueue, final long now) { + tracker = new AveragingProgressTracker(oldQueue.tracker, now); + } + + /** + * Cancel the accumulated sum of delays as we expect the new backend to work now. + */ + void cancelDebt(final long now) { + tracker.cancelDebt(now); + } + + /** + * Drain the contents of the connection into a list. This will leave the queue empty and allow further entries + * to be added to it during replay. When we set the successor all entries enqueued between when this methods + * returns and the successor is set will be replayed to the successor. + * + * @return Collection of entries present in the queue. + */ + final Collection drain() { + final Collection ret = new ArrayDeque<>(inflight.size() + pending.size()); + ret.addAll(inflight); + ret.addAll(pending); + inflight.clear(); + pending.clear(); + return ret; } final long ticksStalling(final long now) { @@ -120,6 +204,8 @@ abstract class TransmitQueue { // If a matching request was found, this will track a task was closed. final Optional complete(final ResponseEnvelope envelope, final long now) { + preComplete(envelope); + Optional maybeEntry = findMatchingEntry(inflight, envelope); if (maybeEntry == null) { LOG.debug("Request for {} not found in inflight queue, checking pending queue", envelope); @@ -150,23 +236,45 @@ abstract class TransmitQueue { private void transmitEntries(final int maxTransmit, final long now) { for (int i = 0; i < maxTransmit; ++i) { final ConnectionEntry e = pending.poll(); - if (e == null) { + if (e == null || !transmitEntry(e, now)) { LOG.debug("Queue {} transmitted {} requests", this, i); return; } - - transmitEntry(e, now); } LOG.debug("Queue {} transmitted {} requests", this, maxTransmit); } - private void transmitEntry(final ConnectionEntry entry, final long now) { - LOG.debug("Queue {} transmitting entry {}", entry); + private boolean transmitEntry(final ConnectionEntry entry, final long now) { + LOG.debug("Queue {} transmitting entry {}", this, entry); // We are not thread-safe and are supposed to be externally-guarded, // hence send-before-record should be fine. // This needs to be revisited if the external guards are lowered. - inflight.addLast(transmit(entry, now)); + final Optional maybeTransmitted = transmit(entry, now); + if (!maybeTransmitted.isPresent()) { + return false; + } + + inflight.addLast(maybeTransmitted.get()); + return true; + } + + final long enqueueOrForward(final ConnectionEntry entry, final long now) { + if (successor != null) { + // This call will pay the enqueuing price, hence the caller does not have to + successor.forwardEntry(entry, now); + return 0; + } + + return enqueue(entry, now); + } + + final void enqueueOrReplay(final ConnectionEntry entry, final long now) { + if (successor != null) { + successor.replayEntry(entry, now); + } else { + enqueue(entry, now); + } } /** @@ -174,11 +282,7 @@ abstract class TransmitQueue { * * @return Delay to be forced on the calling thread, in nanoseconds. */ - final long enqueue(final ConnectionEntry entry, final long now) { - if (successor != null) { - successor.forwardEntry(entry, now); - return 0; - } + private long enqueue(final ConnectionEntry entry, final long now) { // XXX: we should place a guard against incorrect entry sequences: // entry.getEnqueueTicks() should have non-negative difference from the last entry present in the queues @@ -198,7 +302,11 @@ abstract class TransmitQueue { } if (pending.isEmpty()) { - transmitEntry(entry, now); + if (!transmitEntry(entry, now)) { + LOG.debug("Queue {} cannot transmit request {} - delaying it", this, entry.getRequest()); + pending.addLast(entry); + } + return delay; } @@ -212,7 +320,9 @@ abstract class TransmitQueue { */ abstract int canTransmitCount(int inflightSize); - abstract TransmittedConnectionEntry transmit(ConnectionEntry entry, long now); + abstract Optional transmit(ConnectionEntry entry, long now); + + abstract void preComplete(ResponseEnvelope envelope); final boolean isEmpty() { return inflight.isEmpty() && pending.isEmpty(); @@ -227,27 +337,41 @@ abstract class TransmitQueue { return pending.peek(); } - final void poison(final RequestException cause) { - poisonQueue(inflight, cause); - poisonQueue(pending, cause); + final List poison() { + final List entries = new ArrayList<>(inflight.size() + pending.size()); + entries.addAll(inflight); + inflight.clear(); + entries.addAll(pending); + pending.clear(); + return entries; } final void setForwarder(final ReconnectForwarder forwarder, final long now) { - Verify.verify(successor == null, "Successor {} already set on connection {}", successor, this); + Verify.verify(successor == null, "Successor %s already set on connection %s", successor, this); successor = Preconditions.checkNotNull(forwarder); LOG.debug("Connection {} superseded by {}, splicing queue", this, successor); + /* + * We need to account for entries which have been added between the time drain() was called and this method + * is invoked. Since the old connection is visible during replay and some entries may have completed on the + * replay thread, there was an avenue for this to happen. + */ + int count = 0; ConnectionEntry entry = inflight.poll(); while (entry != null) { - successor.forwardEntry(entry, now); + successor.replayEntry(entry, now); entry = inflight.poll(); + count++; } entry = pending.poll(); while (entry != null) { - successor.forwardEntry(entry, now); + successor.replayEntry(entry, now); entry = pending.poll(); + count++; } + + LOG.debug("Connection {} queue spliced {} messages", this, count); } final void remove(final long now) { @@ -323,13 +447,4 @@ abstract class TransmitQueue { return null; } - - private static void poisonQueue(final Queue queue, final RequestException cause) { - for (ConnectionEntry e : queue) { - final Request request = e.getRequest(); - LOG.trace("Poisoning request {}", request, cause); - e.complete(request.toRequestFailure(cause)); - } - queue.clear(); - } }