X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fcds-access-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Faccess%2Fclient%2FTransmitQueue.java;h=cc3da1e4503118ed5a064ceeb60fd34c4fe9e12d;hb=HEAD;hp=9ab80d0d0085df1ef612e48606079b4eae413ea0;hpb=e1c283de301355cb8fa3f7d4fa28a6dd0af501eb;p=controller.git
diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java
index 9ab80d0d00..cc3da1e450 100644
--- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java
+++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/TransmitQueue.java
@@ -7,21 +7,28 @@
*/
package org.opendaylight.controller.cluster.access.client;
+import static com.google.common.base.Verify.verify;
+import static java.util.Objects.requireNonNull;
+
import akka.actor.ActorRef;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
-import com.google.common.collect.Iterables;
+import com.google.common.annotations.VisibleForTesting;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Deque;
import java.util.Iterator;
+import java.util.List;
import java.util.Optional;
import java.util.Queue;
-import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.concepts.Request;
import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
-import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
+import org.opendaylight.controller.cluster.access.concepts.SliceableMessage;
+import org.opendaylight.controller.cluster.messaging.MessageSlicer;
+import org.opendaylight.controller.cluster.messaging.SliceOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,34 +53,48 @@ import org.slf4j.LoggerFactory;
*
*
* This class is not thread-safe, as it is expected to be guarded by {@link AbstractClientConnection}.
- *
- * @author Robert Varga
*/
-@NotThreadSafe
-abstract class TransmitQueue {
+abstract sealed class TransmitQueue {
static final class Halted extends TransmitQueue {
+ // For ConnectingClientConnection.
Halted(final int targetDepth) {
super(targetDepth);
}
+ // For ReconnectingClientConnection.
+ Halted(final TransmitQueue oldQueue, final long now) {
+ super(oldQueue, now);
+ }
+
@Override
int canTransmitCount(final int inflightSize) {
return 0;
}
@Override
- TransmittedConnectionEntry transmit(final ConnectionEntry entry, final long now) {
+ Optional transmit(final ConnectionEntry entry, final long now) {
throw new UnsupportedOperationException("Attempted to transmit on a halted queue");
}
+
+ @Override
+ void preComplete(final ResponseEnvelope> envelope) {
+ }
}
static final class Transmitting extends TransmitQueue {
+ private static final long NOT_SLICING = -1;
+
private final BackendInfo backend;
+ private final MessageSlicer messageSlicer;
private long nextTxSequence;
-
- Transmitting(final int targetDepth, final BackendInfo backend) {
- super(targetDepth);
- this.backend = Preconditions.checkNotNull(backend);
+ private long currentSlicedEnvSequenceId = NOT_SLICING;
+
+ // For ConnectedClientConnection.
+ Transmitting(final TransmitQueue oldQueue, final int targetDepth, final BackendInfo backend, final long now,
+ final MessageSlicer messageSlicer) {
+ super(oldQueue, targetDepth, now);
+ this.backend = requireNonNull(backend);
+ this.messageSlicer = requireNonNull(messageSlicer);
}
@Override
@@ -82,30 +103,95 @@ abstract class TransmitQueue {
}
@Override
- TransmittedConnectionEntry transmit(final ConnectionEntry entry, final long now) {
- final RequestEnvelope env = new RequestEnvelope(entry.getRequest().toVersion(backend.getVersion()),
+ Optional transmit(final ConnectionEntry entry, final long now) {
+ // If we're currently slicing a message we can't send any subsequent requests until slicing completes to
+ // avoid an out-of-sequence request envelope failure on the backend. In this case we return an empty
+ // Optional to indicate the request was not transmitted.
+ if (currentSlicedEnvSequenceId >= 0) {
+ return Optional.empty();
+ }
+
+ final Request, ?> request = entry.getRequest();
+ final RequestEnvelope env = new RequestEnvelope(request.toVersion(backend.getVersion()),
backend.getSessionId(), nextTxSequence++);
- final TransmittedConnectionEntry ret = new TransmittedConnectionEntry(entry, env.getSessionId(),
- env.getTxSequence(), now);
- backend.getActor().tell(env, ActorRef.noSender());
- return ret;
+ if (request instanceof SliceableMessage) {
+ if (messageSlicer.slice(SliceOptions.builder().identifier(request.getTarget())
+ .message(env).replyTo(request.getReplyTo()).sendTo(backend.getActor())
+ .onFailureCallback(t -> env.sendFailure(new RuntimeRequestException(
+ "Failed to slice request " + request, t), 0L)).build())) {
+ // The request was sliced so record the envelope sequence id to prevent transmitting
+ // subsequent requests until slicing completes.
+ currentSlicedEnvSequenceId = env.getTxSequence();
+ }
+ } else {
+ backend.getActor().tell(env, ActorRef.noSender());
+ }
+
+ return Optional.of(new TransmittedConnectionEntry(entry, env.getSessionId(),
+ env.getTxSequence(), now));
+ }
+
+ @Override
+ void preComplete(final ResponseEnvelope> envelope) {
+ if (envelope.getTxSequence() == currentSlicedEnvSequenceId) {
+ // Slicing completed for the prior request - clear the cached sequence id field to enable subsequent
+ // requests to be transmitted.
+ currentSlicedEnvSequenceId = NOT_SLICING;
+ }
}
}
private static final Logger LOG = LoggerFactory.getLogger(TransmitQueue.class);
- private final ArrayDeque inflight = new ArrayDeque<>();
- private final ArrayDeque pending = new ArrayDeque<>();
- private final ProgressTracker tracker;
+ private final Deque inflight = new ArrayDeque<>();
+ private final Deque pending = new ArrayDeque<>();
+ // Cannot be just ProgressTracker as we are inheriting limits.
+ private final AveragingProgressTracker tracker;
private ReconnectForwarder successor;
+ /**
+ * Construct initial transmitting queue.
+ */
TransmitQueue(final int targetDepth) {
tracker = new AveragingProgressTracker(targetDepth);
}
- final Iterable asIterable() {
- return Iterables.concat(inflight, pending);
+ /**
+ * Construct new transmitting queue while inheriting timing data from the previous transmit queue instance.
+ */
+ TransmitQueue(final TransmitQueue oldQueue, final int targetDepth, final long now) {
+ tracker = new AveragingProgressTracker(oldQueue.tracker, targetDepth, now);
+ }
+
+ /**
+ * Construct new transmitting queue while inheriting timing and size data from the previous transmit queue instance.
+ */
+ TransmitQueue(final TransmitQueue oldQueue, final long now) {
+ tracker = new AveragingProgressTracker(oldQueue.tracker, now);
+ }
+
+ /**
+ * Cancel the accumulated sum of delays as we expect the new backend to work now.
+ */
+ void cancelDebt(final long now) {
+ tracker.cancelDebt(now);
+ }
+
+ /**
+ * Drain the contents of the connection into a list. This will leave the queue empty and allow further entries
+ * to be added to it during replay. When we set the successor all entries enqueued between when this methods
+ * returns and the successor is set will be replayed to the successor.
+ *
+ * @return Collection of entries present in the queue.
+ */
+ final Collection drain() {
+ final Collection ret = new ArrayDeque<>(inflight.size() + pending.size());
+ ret.addAll(inflight);
+ ret.addAll(pending);
+ inflight.clear();
+ pending.clear();
+ return ret;
}
final long ticksStalling(final long now) {
@@ -118,6 +204,8 @@ abstract class TransmitQueue {
// If a matching request was found, this will track a task was closed.
final Optional complete(final ResponseEnvelope> envelope, final long now) {
+ preComplete(envelope);
+
Optional maybeEntry = findMatchingEntry(inflight, envelope);
if (maybeEntry == null) {
LOG.debug("Request for {} not found in inflight queue, checking pending queue", envelope);
@@ -129,23 +217,64 @@ abstract class TransmitQueue {
return Optional.empty();
}
- final TransmittedConnectionEntry entry = maybeEntry.get();
+ final TransmittedConnectionEntry entry = maybeEntry.orElseThrow();
tracker.closeTask(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
// We have freed up a slot, try to transmit something
- int toSend = canTransmitCount(inflight.size());
- while (toSend > 0) {
+ tryTransmit(now);
+
+ return Optional.of(entry);
+ }
+
+ final void tryTransmit(final long now) {
+ final int toSend = canTransmitCount(inflight.size());
+ if (toSend > 0 && !pending.isEmpty()) {
+ transmitEntries(toSend, now);
+ }
+ }
+
+ private void transmitEntries(final int maxTransmit, final long now) {
+ for (int i = 0; i < maxTransmit; ++i) {
final ConnectionEntry e = pending.poll();
- if (e == null) {
- break;
+ if (e == null || !transmitEntry(e, now)) {
+ LOG.debug("Queue {} transmitted {} requests", this, i);
+ return;
}
+ }
+
+ LOG.debug("Queue {} transmitted {} requests", this, maxTransmit);
+ }
- LOG.debug("Transmitting entry {}", e);
- transmit(e, now);
- toSend--;
+ private boolean transmitEntry(final ConnectionEntry entry, final long now) {
+ LOG.debug("Queue {} transmitting entry {}", this, entry);
+ // We are not thread-safe and are supposed to be externally-guarded,
+ // hence send-before-record should be fine.
+ // This needs to be revisited if the external guards are lowered.
+ final Optional maybeTransmitted = transmit(entry, now);
+ if (!maybeTransmitted.isPresent()) {
+ return false;
}
- return Optional.of(entry);
+ inflight.addLast(maybeTransmitted.orElseThrow());
+ return true;
+ }
+
+ final long enqueueOrForward(final ConnectionEntry entry, final long now) {
+ if (successor != null) {
+ // This call will pay the enqueuing price, hence the caller does not have to
+ successor.forwardEntry(entry, now);
+ return 0;
+ }
+
+ return enqueue(entry, now);
+ }
+
+ final void enqueueOrReplay(final ConnectionEntry entry, final long now) {
+ if (successor != null) {
+ successor.replayEntry(entry, now);
+ } else {
+ enqueue(entry, now);
+ }
}
/**
@@ -153,27 +282,36 @@ abstract class TransmitQueue {
*
* @return Delay to be forced on the calling thread, in nanoseconds.
*/
- final long enqueue(final ConnectionEntry entry, final long now) {
- if (successor != null) {
- successor.forwardEntry(entry, now);
- return 0;
- }
+ private long enqueue(final ConnectionEntry entry, final long now) {
// XXX: we should place a guard against incorrect entry sequences:
// entry.getEnqueueTicks() should have non-negative difference from the last entry present in the queues
// Reserve an entry before we do anything that can fail
final long delay = tracker.openTask(now);
- if (canTransmitCount(inflight.size()) <= 0) {
+
+ /*
+ * This is defensive to make sure we do not do the wrong thing here and reorder messages if we ever happen
+ * to have available send slots and non-empty pending queue.
+ */
+ final int toSend = canTransmitCount(inflight.size());
+ if (toSend <= 0) {
LOG.trace("Queue is at capacity, delayed sending of request {}", entry.getRequest());
- pending.add(entry);
- } else {
- // We are not thread-safe and are supposed to be externally-guarded,
- // hence send-before-record should be fine.
- // This needs to be revisited if the external guards are lowered.
- inflight.offer(transmit(entry, now));
- LOG.debug("Sent request {} on queue {}", entry.getRequest(), this);
+ pending.addLast(entry);
+ return delay;
}
+
+ if (pending.isEmpty()) {
+ if (!transmitEntry(entry, now)) {
+ LOG.debug("Queue {} cannot transmit request {} - delaying it", this, entry.getRequest());
+ pending.addLast(entry);
+ }
+
+ return delay;
+ }
+
+ pending.addLast(entry);
+ transmitEntries(toSend, now);
return delay;
}
@@ -182,7 +320,9 @@ abstract class TransmitQueue {
*/
abstract int canTransmitCount(int inflightSize);
- abstract TransmittedConnectionEntry transmit(ConnectionEntry entry, long now);
+ abstract Optional transmit(ConnectionEntry entry, long now);
+
+ abstract void preComplete(ResponseEnvelope> envelope);
final boolean isEmpty() {
return inflight.isEmpty() && pending.isEmpty();
@@ -197,29 +337,63 @@ abstract class TransmitQueue {
return pending.peek();
}
- final void poison(final RequestException cause) {
- poisonQueue(inflight, cause);
- poisonQueue(pending, cause);
+ final List poison() {
+ final List entries = new ArrayList<>(inflight.size() + pending.size());
+ entries.addAll(inflight);
+ inflight.clear();
+ entries.addAll(pending);
+ pending.clear();
+ return entries;
}
final void setForwarder(final ReconnectForwarder forwarder, final long now) {
- Verify.verify(successor == null, "Successor {} already set on connection {}", successor, this);
- successor = Preconditions.checkNotNull(forwarder);
+ verify(successor == null, "Successor %s already set on connection %s", successor, this);
+ successor = requireNonNull(forwarder);
LOG.debug("Connection {} superseded by {}, splicing queue", this, successor);
+ /*
+ * We need to account for entries which have been added between the time drain() was called and this method
+ * is invoked. Since the old connection is visible during replay and some entries may have completed on the
+ * replay thread, there was an avenue for this to happen.
+ */
+ int count = 0;
ConnectionEntry entry = inflight.poll();
while (entry != null) {
- successor.forwardEntry(entry, now);
+ successor.replayEntry(entry, now);
entry = inflight.poll();
+ count++;
}
entry = pending.poll();
while (entry != null) {
- successor.forwardEntry(entry, now);
+ successor.replayEntry(entry, now);
entry = pending.poll();
+ count++;
+ }
+
+ LOG.debug("Connection {} queue spliced {} messages", this, count);
+ }
+
+ final void remove(final long now) {
+ final TransmittedConnectionEntry txe = inflight.poll();
+ if (txe == null) {
+ final ConnectionEntry entry = pending.pop();
+ tracker.closeTask(now, entry.getEnqueuedTicks(), 0, 0);
+ } else {
+ tracker.closeTask(now, txe.getEnqueuedTicks(), txe.getTxTicks(), 0);
}
}
+ @VisibleForTesting
+ Deque getInflight() {
+ return inflight;
+ }
+
+ @VisibleForTesting
+ Deque getPending() {
+ return pending;
+ }
+
/*
* We are using tri-state return here to indicate one of three conditions:
* - if a matching entry is found, return an Optional containing it
@@ -250,12 +424,10 @@ abstract class TransmitQueue {
}
// Check if the entry has (ever) been transmitted
- if (!(e instanceof TransmittedConnectionEntry)) {
+ if (!(e instanceof TransmittedConnectionEntry te)) {
return Optional.empty();
}
- final TransmittedConnectionEntry te = (TransmittedConnectionEntry) e;
-
// Now check session match
if (envelope.getSessionId() != te.getSessionId()) {
LOG.debug("Expecting session {}, ignoring response {}", te.getSessionId(), envelope);
@@ -273,14 +445,4 @@ abstract class TransmitQueue {
return null;
}
-
- private static void poisonQueue(final Queue extends ConnectionEntry> queue, final RequestException cause) {
- for (ConnectionEntry e : queue) {
- final Request, ?> request = e.getRequest();
- LOG.trace("Poisoning request {}", request, cause);
- e.complete(request.toRequestFailure(cause));
- }
- queue.clear();
- }
-
}