*/
package org.opendaylight.controller.cluster.access.client;
+import static com.google.common.base.Verify.verify;
+import static java.util.Objects.requireNonNull;
+
import akka.actor.ActorRef;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayDeque;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.Iterator;
+import java.util.List;
import java.util.Optional;
import java.util.Queue;
-import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.concepts.Request;
import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
-import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
+import org.opendaylight.controller.cluster.access.concepts.SliceableMessage;
+import org.opendaylight.controller.cluster.messaging.MessageSlicer;
+import org.opendaylight.controller.cluster.messaging.SliceOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
*
* @author Robert Varga
*/
-@NotThreadSafe
abstract class TransmitQueue {
static final class Halted extends TransmitQueue {
+ // For ConnectingClientConnection.
Halted(final int targetDepth) {
super(targetDepth);
}
+ // For ReconnectingClientConnection.
+ Halted(final TransmitQueue oldQueue, final long now) {
+ super(oldQueue, now);
+ }
+
@Override
int canTransmitCount(final int inflightSize) {
return 0;
}
@Override
- TransmittedConnectionEntry transmit(final ConnectionEntry entry, final long now) {
+ Optional<TransmittedConnectionEntry> transmit(final ConnectionEntry entry, final long now) {
throw new UnsupportedOperationException("Attempted to transmit on a halted queue");
}
+
+ @Override
+ void preComplete(final ResponseEnvelope<?> envelope) {
+ }
}
static final class Transmitting extends TransmitQueue {
+ private static final long NOT_SLICING = -1;
+
private final BackendInfo backend;
+ private final MessageSlicer messageSlicer;
private long nextTxSequence;
-
- Transmitting(final int targetDepth, final BackendInfo backend) {
- super(targetDepth);
- this.backend = Preconditions.checkNotNull(backend);
+ private long currentSlicedEnvSequenceId = NOT_SLICING;
+
+ // For ConnectedClientConnection.
+ Transmitting(final TransmitQueue oldQueue, final int targetDepth, final BackendInfo backend, final long now,
+ final MessageSlicer messageSlicer) {
+ super(oldQueue, targetDepth, now);
+ this.backend = requireNonNull(backend);
+ this.messageSlicer = requireNonNull(messageSlicer);
}
@Override
}
@Override
- TransmittedConnectionEntry transmit(final ConnectionEntry entry, final long now) {
- final RequestEnvelope env = new RequestEnvelope(entry.getRequest().toVersion(backend.getVersion()),
+ Optional<TransmittedConnectionEntry> transmit(final ConnectionEntry entry, final long now) {
+ // If we're currently slicing a message we can't send any subsequent requests until slicing completes to
+ // avoid an out-of-sequence request envelope failure on the backend. In this case we return an empty
+ // Optional to indicate the request was not transmitted.
+ if (currentSlicedEnvSequenceId >= 0) {
+ return Optional.empty();
+ }
+
+ final Request<?, ?> request = entry.getRequest();
+ final RequestEnvelope env = new RequestEnvelope(request.toVersion(backend.getVersion()),
backend.getSessionId(), nextTxSequence++);
- final TransmittedConnectionEntry ret = new TransmittedConnectionEntry(entry, env.getSessionId(),
- env.getTxSequence(), now);
- backend.getActor().tell(env, ActorRef.noSender());
- return ret;
+ if (request instanceof SliceableMessage) {
+ if (messageSlicer.slice(SliceOptions.builder().identifier(request.getTarget())
+ .message(env).replyTo(request.getReplyTo()).sendTo(backend.getActor())
+ .onFailureCallback(t -> env.sendFailure(new RuntimeRequestException(
+ "Failed to slice request " + request, t), 0L)).build())) {
+ // The request was sliced so record the envelope sequence id to prevent transmitting
+ // subsequent requests until slicing completes.
+ currentSlicedEnvSequenceId = env.getTxSequence();
+ }
+ } else {
+ backend.getActor().tell(env, ActorRef.noSender());
+ }
+
+ return Optional.of(new TransmittedConnectionEntry(entry, env.getSessionId(),
+ env.getTxSequence(), now));
+ }
+
+ @Override
+ void preComplete(final ResponseEnvelope<?> envelope) {
+ if (envelope.getTxSequence() == currentSlicedEnvSequenceId) {
+ // Slicing completed for the prior request - clear the cached sequence id field to enable subsequent
+ // requests to be transmitted.
+ currentSlicedEnvSequenceId = NOT_SLICING;
+ }
}
}
private final Deque<TransmittedConnectionEntry> inflight = new ArrayDeque<>();
private final Deque<ConnectionEntry> pending = new ArrayDeque<>();
- private final ProgressTracker tracker;
+ private final AveragingProgressTracker tracker; // Cannot be just ProgressTracker as we are inheriting limits.
private ReconnectForwarder successor;
+ /**
+ * Construct initial transmitting queue.
+ */
TransmitQueue(final int targetDepth) {
tracker = new AveragingProgressTracker(targetDepth);
}
+ /**
+ * Construct new transmitting queue while inheriting timing data from the previous transmit queue instance.
+ */
+ TransmitQueue(final TransmitQueue oldQueue, final int targetDepth, final long now) {
+ tracker = new AveragingProgressTracker(oldQueue.tracker, targetDepth, now);
+ }
+
+ /**
+ * Construct new transmitting queue while inheriting timing and size data from the previous transmit queue instance.
+ */
+ TransmitQueue(final TransmitQueue oldQueue, final long now) {
+ tracker = new AveragingProgressTracker(oldQueue.tracker, now);
+ }
+
+ /**
+ * Cancel the accumulated sum of delays as we expect the new backend to work now.
+ */
+ void cancelDebt(final long now) {
+ tracker.cancelDebt(now);
+ }
+
/**
* Drain the contents of the connection into a list. This will leave the queue empty and allow further entries
* to be added to it during replay. When we set the successor all entries enqueued between when this methods
// If a matching request was found, this will track a task was closed.
final Optional<TransmittedConnectionEntry> complete(final ResponseEnvelope<?> envelope, final long now) {
+ preComplete(envelope);
+
Optional<TransmittedConnectionEntry> maybeEntry = findMatchingEntry(inflight, envelope);
if (maybeEntry == null) {
LOG.debug("Request for {} not found in inflight queue, checking pending queue", envelope);
private void transmitEntries(final int maxTransmit, final long now) {
for (int i = 0; i < maxTransmit; ++i) {
final ConnectionEntry e = pending.poll();
- if (e == null) {
+ if (e == null || !transmitEntry(e, now)) {
LOG.debug("Queue {} transmitted {} requests", this, i);
return;
}
-
- transmitEntry(e, now);
}
LOG.debug("Queue {} transmitted {} requests", this, maxTransmit);
}
- private void transmitEntry(final ConnectionEntry entry, final long now) {
- LOG.debug("Queue {} transmitting entry {}", entry);
+ private boolean transmitEntry(final ConnectionEntry entry, final long now) {
+ LOG.debug("Queue {} transmitting entry {}", this, entry);
// We are not thread-safe and are supposed to be externally-guarded,
// hence send-before-record should be fine.
// This needs to be revisited if the external guards are lowered.
- inflight.addLast(transmit(entry, now));
+ final Optional<TransmittedConnectionEntry> maybeTransmitted = transmit(entry, now);
+ if (!maybeTransmitted.isPresent()) {
+ return false;
+ }
+
+ inflight.addLast(maybeTransmitted.get());
+ return true;
}
- /**
- * Enqueue an entry, possibly also transmitting it.
- *
- * @return Delay to be forced on the calling thread, in nanoseconds.
- */
- final long enqueue(final ConnectionEntry entry, final long now) {
+ final long enqueueOrForward(final ConnectionEntry entry, final long now) {
if (successor != null) {
// This call will pay the enqueuing price, hence the caller does not have to
successor.forwardEntry(entry, now);
return 0;
}
+ return enqueue(entry, now);
+ }
+
+ final void enqueueOrReplay(final ConnectionEntry entry, final long now) {
+ if (successor != null) {
+ successor.replayEntry(entry, now);
+ } else {
+ enqueue(entry, now);
+ }
+ }
+
+ /**
+ * Enqueue an entry, possibly also transmitting it.
+ *
+ * @return Delay to be forced on the calling thread, in nanoseconds.
+ */
+ private long enqueue(final ConnectionEntry entry, final long now) {
+
// XXX: we should place a guard against incorrect entry sequences:
// entry.getEnqueueTicks() should have non-negative difference from the last entry present in the queues
}
if (pending.isEmpty()) {
- transmitEntry(entry, now);
+ if (!transmitEntry(entry, now)) {
+ LOG.debug("Queue {} cannot transmit request {} - delaying it", this, entry.getRequest());
+ pending.addLast(entry);
+ }
+
return delay;
}
*/
abstract int canTransmitCount(int inflightSize);
- abstract TransmittedConnectionEntry transmit(ConnectionEntry entry, long now);
+ abstract Optional<TransmittedConnectionEntry> transmit(ConnectionEntry entry, long now);
+
+ abstract void preComplete(ResponseEnvelope<?> envelope);
final boolean isEmpty() {
return inflight.isEmpty() && pending.isEmpty();
return pending.peek();
}
- final void poison(final RequestException cause) {
- poisonQueue(inflight, cause);
- poisonQueue(pending, cause);
+ final List<ConnectionEntry> poison() {
+ final List<ConnectionEntry> entries = new ArrayList<>(inflight.size() + pending.size());
+ entries.addAll(inflight);
+ inflight.clear();
+ entries.addAll(pending);
+ pending.clear();
+ return entries;
}
final void setForwarder(final ReconnectForwarder forwarder, final long now) {
- Verify.verify(successor == null, "Successor {} already set on connection {}", successor, this);
- successor = Preconditions.checkNotNull(forwarder);
+ verify(successor == null, "Successor %s already set on connection %s", successor, this);
+ successor = requireNonNull(forwarder);
LOG.debug("Connection {} superseded by {}, splicing queue", this, successor);
/*
return null;
}
-
- private static void poisonQueue(final Queue<? extends ConnectionEntry> queue, final RequestException cause) {
- for (ConnectionEntry e : queue) {
- final Request<?, ?> request = e.getRequest();
- LOG.trace("Poisoning request {}", request, cause);
- e.complete(request.toRequestFailure(cause));
- }
- queue.clear();
- }
}