2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.access.client;
10 import static com.google.common.base.Verify.verify;
11 import static java.util.Objects.requireNonNull;
13 import akka.actor.ActorRef;
14 import com.google.common.annotations.VisibleForTesting;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import java.util.ArrayDeque;
17 import java.util.ArrayList;
18 import java.util.Collection;
19 import java.util.Deque;
20 import java.util.Iterator;
21 import java.util.List;
22 import java.util.Optional;
23 import java.util.Queue;
24 import org.opendaylight.controller.cluster.access.concepts.Request;
25 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
26 import org.opendaylight.controller.cluster.access.concepts.Response;
27 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
28 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
29 import org.opendaylight.controller.cluster.access.concepts.SliceableMessage;
30 import org.opendaylight.controller.cluster.messaging.MessageSlicer;
31 import org.opendaylight.controller.cluster.messaging.SliceOptions;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
36 * This queue is internally split into two queues for performance reasons, both memory efficiency and copy
40 * Entries are always appended to the end, but then they are transmitted to the remote end and do not necessarily
41 * complete in the order in which they were sent -- hence the head of the queue does not increase linearly,
42 * but can involve spurious removals of non-head entries.
45 * For memory efficiency we want to pre-allocate both queues -- which points to ArrayDeque, but that is very
46 * inefficient when entries are removed from the middle. In the typical case we expect the number of in-flight
47 * entries to be an order of magnitude lower than the number of enqueued entries, hence the split.
50 * Note that in transient case of reconnect, when the backend gives us a lower number of maximum in-flight entries
51 * than the previous incarnation, we may end up still moving the pending queue -- but that is a very exceptional
52 * scenario, hence we consciously ignore it to keep the design relatively simple.
55 * This class is not thread-safe, as it is expected to be guarded by {@link AbstractClientConnection}.
57 * @author Robert Varga
59 abstract class TransmitQueue {
60 static final class Halted extends TransmitQueue {
61 // For ConnectingClientConnection.
62 Halted(final int targetDepth) {
66 // For ReconnectingClientConnection.
67 Halted(final TransmitQueue oldQueue, final long now) {
72 int canTransmitCount(final int inflightSize) {
77 Optional<TransmittedConnectionEntry> transmit(final ConnectionEntry entry, final long now) {
78 throw new UnsupportedOperationException("Attempted to transmit on a halted queue");
82 void preComplete(final ResponseEnvelope<?> envelope) {
86 static final class Transmitting extends TransmitQueue {
87 private static final long NOT_SLICING = -1;
89 private final BackendInfo backend;
90 private final MessageSlicer messageSlicer;
91 private long nextTxSequence;
92 private long currentSlicedEnvSequenceId = NOT_SLICING;
94 // For ConnectedClientConnection.
95 Transmitting(final TransmitQueue oldQueue, final int targetDepth, final BackendInfo backend, final long now,
96 final MessageSlicer messageSlicer) {
97 super(oldQueue, targetDepth, now);
98 this.backend = requireNonNull(backend);
99 this.messageSlicer = requireNonNull(messageSlicer);
103 int canTransmitCount(final int inflightSize) {
104 return backend.getMaxMessages() - inflightSize;
108 Optional<TransmittedConnectionEntry> transmit(final ConnectionEntry entry, final long now) {
109 // If we're currently slicing a message we can't send any subsequent requests until slicing completes to
110 // avoid an out-of-sequence request envelope failure on the backend. In this case we return an empty
111 // Optional to indicate the request was not transmitted.
112 if (currentSlicedEnvSequenceId >= 0) {
113 return Optional.empty();
116 final Request<?, ?> request = entry.getRequest();
117 final RequestEnvelope env = new RequestEnvelope(request.toVersion(backend.getVersion()),
118 backend.getSessionId(), nextTxSequence++);
120 if (request instanceof SliceableMessage) {
121 if (messageSlicer.slice(SliceOptions.builder().identifier(request.getTarget())
122 .message(env).replyTo(request.getReplyTo()).sendTo(backend.getActor())
123 .onFailureCallback(t -> env.sendFailure(new RuntimeRequestException(
124 "Failed to slice request " + request, t), 0L)).build())) {
125 // The request was sliced so record the envelope sequence id to prevent transmitting
126 // subsequent requests until slicing completes.
127 currentSlicedEnvSequenceId = env.getTxSequence();
130 backend.getActor().tell(env, ActorRef.noSender());
133 return Optional.of(new TransmittedConnectionEntry(entry, env.getSessionId(),
134 env.getTxSequence(), now));
138 void preComplete(final ResponseEnvelope<?> envelope) {
139 if (envelope.getTxSequence() == currentSlicedEnvSequenceId) {
140 // Slicing completed for the prior request - clear the cached sequence id field to enable subsequent
141 // requests to be transmitted.
142 currentSlicedEnvSequenceId = NOT_SLICING;
147 private static final Logger LOG = LoggerFactory.getLogger(TransmitQueue.class);
149 private final Deque<TransmittedConnectionEntry> inflight = new ArrayDeque<>();
150 private final Deque<ConnectionEntry> pending = new ArrayDeque<>();
151 private final AveragingProgressTracker tracker; // Cannot be just ProgressTracker as we are inheriting limits.
152 private ReconnectForwarder successor;
155 * Construct initial transmitting queue.
157 TransmitQueue(final int targetDepth) {
158 tracker = new AveragingProgressTracker(targetDepth);
162 * Construct new transmitting queue while inheriting timing data from the previous transmit queue instance.
164 TransmitQueue(final TransmitQueue oldQueue, final int targetDepth, final long now) {
165 tracker = new AveragingProgressTracker(oldQueue.tracker, targetDepth, now);
169 * Construct new transmitting queue while inheriting timing and size data from the previous transmit queue instance.
171 TransmitQueue(final TransmitQueue oldQueue, final long now) {
172 tracker = new AveragingProgressTracker(oldQueue.tracker, now);
176 * Cancel the accumulated sum of delays as we expect the new backend to work now.
178 void cancelDebt(final long now) {
179 tracker.cancelDebt(now);
183 * Drain the contents of the connection into a list. This will leave the queue empty and allow further entries
184 * to be added to it during replay. When we set the successor all entries enqueued between when this methods
185 * returns and the successor is set will be replayed to the successor.
187 * @return Collection of entries present in the queue.
189 final Collection<ConnectionEntry> drain() {
190 final Collection<ConnectionEntry> ret = new ArrayDeque<>(inflight.size() + pending.size());
191 ret.addAll(inflight);
198 final long ticksStalling(final long now) {
199 return tracker.ticksStalling(now);
202 final boolean hasSuccessor() {
203 return successor != null;
206 // If a matching request was found, this will track a task was closed.
207 final Optional<TransmittedConnectionEntry> complete(final ResponseEnvelope<?> envelope, final long now) {
208 preComplete(envelope);
210 Optional<TransmittedConnectionEntry> maybeEntry = findMatchingEntry(inflight, envelope);
211 if (maybeEntry == null) {
212 LOG.debug("Request for {} not found in inflight queue, checking pending queue", envelope);
213 maybeEntry = findMatchingEntry(pending, envelope);
216 if (maybeEntry == null || !maybeEntry.isPresent()) {
217 LOG.warn("No request matching {} found, ignoring response", envelope);
218 return Optional.empty();
221 final TransmittedConnectionEntry entry = maybeEntry.get();
222 tracker.closeTask(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
224 // We have freed up a slot, try to transmit something
227 return Optional.of(entry);
230 final void tryTransmit(final long now) {
231 final int toSend = canTransmitCount(inflight.size());
232 if (toSend > 0 && !pending.isEmpty()) {
233 transmitEntries(toSend, now);
237 private void transmitEntries(final int maxTransmit, final long now) {
238 for (int i = 0; i < maxTransmit; ++i) {
239 final ConnectionEntry e = pending.poll();
240 if (e == null || !transmitEntry(e, now)) {
241 LOG.debug("Queue {} transmitted {} requests", this, i);
246 LOG.debug("Queue {} transmitted {} requests", this, maxTransmit);
249 private boolean transmitEntry(final ConnectionEntry entry, final long now) {
250 LOG.debug("Queue {} transmitting entry {}", this, entry);
251 // We are not thread-safe and are supposed to be externally-guarded,
252 // hence send-before-record should be fine.
253 // This needs to be revisited if the external guards are lowered.
254 final Optional<TransmittedConnectionEntry> maybeTransmitted = transmit(entry, now);
255 if (!maybeTransmitted.isPresent()) {
259 inflight.addLast(maybeTransmitted.get());
263 final long enqueueOrForward(final ConnectionEntry entry, final long now) {
264 if (successor != null) {
265 // This call will pay the enqueuing price, hence the caller does not have to
266 successor.forwardEntry(entry, now);
270 return enqueue(entry, now);
273 final void enqueueOrReplay(final ConnectionEntry entry, final long now) {
274 if (successor != null) {
275 successor.replayEntry(entry, now);
282 * Enqueue an entry, possibly also transmitting it.
284 * @return Delay to be forced on the calling thread, in nanoseconds.
286 private long enqueue(final ConnectionEntry entry, final long now) {
288 // XXX: we should place a guard against incorrect entry sequences:
289 // entry.getEnqueueTicks() should have non-negative difference from the last entry present in the queues
291 // Reserve an entry before we do anything that can fail
292 final long delay = tracker.openTask(now);
295 * This is defensive to make sure we do not do the wrong thing here and reorder messages if we ever happen
296 * to have available send slots and non-empty pending queue.
298 final int toSend = canTransmitCount(inflight.size());
300 LOG.trace("Queue is at capacity, delayed sending of request {}", entry.getRequest());
301 pending.addLast(entry);
305 if (pending.isEmpty()) {
306 if (!transmitEntry(entry, now)) {
307 LOG.debug("Queue {} cannot transmit request {} - delaying it", this, entry.getRequest());
308 pending.addLast(entry);
314 pending.addLast(entry);
315 transmitEntries(toSend, now);
320 * Return the number of entries which can be transmitted assuming the supplied in-flight queue size.
322 abstract int canTransmitCount(int inflightSize);
324 abstract Optional<TransmittedConnectionEntry> transmit(ConnectionEntry entry, long now);
326 abstract void preComplete(ResponseEnvelope<?> envelope);
328 final boolean isEmpty() {
329 return inflight.isEmpty() && pending.isEmpty();
332 final ConnectionEntry peek() {
333 final ConnectionEntry ret = inflight.peek();
338 return pending.peek();
341 final List<ConnectionEntry> poison() {
342 final List<ConnectionEntry> entries = new ArrayList<>(inflight.size() + pending.size());
343 entries.addAll(inflight);
345 entries.addAll(pending);
350 final void setForwarder(final ReconnectForwarder forwarder, final long now) {
351 verify(successor == null, "Successor %s already set on connection %s", successor, this);
352 successor = requireNonNull(forwarder);
353 LOG.debug("Connection {} superseded by {}, splicing queue", this, successor);
356 * We need to account for entries which have been added between the time drain() was called and this method
357 * is invoked. Since the old connection is visible during replay and some entries may have completed on the
358 * replay thread, there was an avenue for this to happen.
361 ConnectionEntry entry = inflight.poll();
362 while (entry != null) {
363 successor.replayEntry(entry, now);
364 entry = inflight.poll();
368 entry = pending.poll();
369 while (entry != null) {
370 successor.replayEntry(entry, now);
371 entry = pending.poll();
375 LOG.debug("Connection {} queue spliced {} messages", this, count);
378 final void remove(final long now) {
379 final TransmittedConnectionEntry txe = inflight.poll();
381 final ConnectionEntry entry = pending.pop();
382 tracker.closeTask(now, entry.getEnqueuedTicks(), 0, 0);
384 tracker.closeTask(now, txe.getEnqueuedTicks(), txe.getTxTicks(), 0);
389 Deque<TransmittedConnectionEntry> getInflight() {
394 Deque<ConnectionEntry> getPending() {
399 * We are using tri-state return here to indicate one of three conditions:
400 * - if a matching entry is found, return an Optional containing it
401 * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null
402 * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional
404 @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL",
405 justification = "Returning null Optional is documented in the API contract.")
406 private static Optional<TransmittedConnectionEntry> findMatchingEntry(final Queue<? extends ConnectionEntry> queue,
407 final ResponseEnvelope<?> envelope) {
408 // Try to find the request in a queue. Responses may legally come back in a different order, hence we need
409 // to use an iterator
410 final Iterator<? extends ConnectionEntry> it = queue.iterator();
411 while (it.hasNext()) {
412 final ConnectionEntry e = it.next();
413 final Request<?, ?> request = e.getRequest();
414 final Response<?, ?> response = envelope.getMessage();
416 // First check for matching target, or move to next entry
417 if (!request.getTarget().equals(response.getTarget())) {
421 // Sanity-check logical sequence, ignore any out-of-order messages
422 if (request.getSequence() != response.getSequence()) {
423 LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope);
424 return Optional.empty();
427 // Check if the entry has (ever) been transmitted
428 if (!(e instanceof TransmittedConnectionEntry)) {
429 return Optional.empty();
432 final TransmittedConnectionEntry te = (TransmittedConnectionEntry) e;
434 // Now check session match
435 if (envelope.getSessionId() != te.getSessionId()) {
436 LOG.debug("Expecting session {}, ignoring response {}", te.getSessionId(), envelope);
437 return Optional.empty();
439 if (envelope.getTxSequence() != te.getTxSequence()) {
440 LOG.warn("Expecting txSequence {}, ignoring response {}", te.getTxSequence(), envelope);
441 return Optional.empty();
444 LOG.debug("Completing request {} with {}", request, envelope);
446 return Optional.of(te);