2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.access.client;
10 import akka.actor.ActorRef;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Verify;
14 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
15 import java.util.ArrayDeque;
16 import java.util.Collection;
17 import java.util.Deque;
18 import java.util.Iterator;
19 import java.util.Optional;
20 import java.util.Queue;
21 import javax.annotation.concurrent.NotThreadSafe;
22 import org.opendaylight.controller.cluster.access.concepts.Request;
23 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
24 import org.opendaylight.controller.cluster.access.concepts.RequestException;
25 import org.opendaylight.controller.cluster.access.concepts.Response;
26 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
31 * This queue is internally split into two queues for performance reasons, both memory efficiency and copy
35 * Entries are always appended to the end, but then they are transmitted to the remote end and do not necessarily
36 * complete in the order in which they were sent -- hence the head of the queue does not increase linearly,
37 * but can involve spurious removals of non-head entries.
40 * For memory efficiency we want to pre-allocate both queues -- which points to ArrayDeque, but that is very
41 * inefficient when entries are removed from the middle. In the typical case we expect the number of in-flight
42 * entries to be an order of magnitude lower than the number of enqueued entries, hence the split.
45 * Note that in transient case of reconnect, when the backend gives us a lower number of maximum in-flight entries
46 * than the previous incarnation, we may end up still moving the pending queue -- but that is a very exceptional
47 * scenario, hence we consciously ignore it to keep the design relatively simple.
50 * This class is not thread-safe, as it is expected to be guarded by {@link AbstractClientConnection}.
52 * @author Robert Varga
55 abstract class TransmitQueue {
56 static final class Halted extends TransmitQueue {
57 Halted(final int targetDepth) {
62 int canTransmitCount(final int inflightSize) {
67 TransmittedConnectionEntry transmit(final ConnectionEntry entry, final long now) {
68 throw new UnsupportedOperationException("Attempted to transmit on a halted queue");
72 static final class Transmitting extends TransmitQueue {
73 private final BackendInfo backend;
74 private long nextTxSequence;
76 Transmitting(final int targetDepth, final BackendInfo backend) {
78 this.backend = Preconditions.checkNotNull(backend);
82 int canTransmitCount(final int inflightSize) {
83 return backend.getMaxMessages() - inflightSize;
87 TransmittedConnectionEntry transmit(final ConnectionEntry entry, final long now) {
88 final RequestEnvelope env = new RequestEnvelope(entry.getRequest().toVersion(backend.getVersion()),
89 backend.getSessionId(), nextTxSequence++);
91 final TransmittedConnectionEntry ret = new TransmittedConnectionEntry(entry, env.getSessionId(),
92 env.getTxSequence(), now);
93 backend.getActor().tell(env, ActorRef.noSender());
98 private static final Logger LOG = LoggerFactory.getLogger(TransmitQueue.class);
100 private final Deque<TransmittedConnectionEntry> inflight = new ArrayDeque<>();
101 private final Deque<ConnectionEntry> pending = new ArrayDeque<>();
102 private final ProgressTracker tracker;
103 private ReconnectForwarder successor;
105 TransmitQueue(final int targetDepth) {
106 tracker = new AveragingProgressTracker(targetDepth);
110 * Drain the contents of the connection into a list. This will leave the queue empty and allow further entries
111 * to be added to it during replay. When we set the successor all entries enqueued between when this methods
112 * returns and the successor is set will be replayed to the successor.
114 * @return Collection of entries present in the queue.
116 final Collection<ConnectionEntry> drain() {
117 final Collection<ConnectionEntry> ret = new ArrayDeque<>(inflight.size() + pending.size());
118 ret.addAll(inflight);
125 final long ticksStalling(final long now) {
126 return tracker.ticksStalling(now);
129 final boolean hasSuccessor() {
130 return successor != null;
133 // If a matching request was found, this will track a task was closed.
134 final Optional<TransmittedConnectionEntry> complete(final ResponseEnvelope<?> envelope, final long now) {
135 Optional<TransmittedConnectionEntry> maybeEntry = findMatchingEntry(inflight, envelope);
136 if (maybeEntry == null) {
137 LOG.debug("Request for {} not found in inflight queue, checking pending queue", envelope);
138 maybeEntry = findMatchingEntry(pending, envelope);
141 if (maybeEntry == null || !maybeEntry.isPresent()) {
142 LOG.warn("No request matching {} found, ignoring response", envelope);
143 return Optional.empty();
146 final TransmittedConnectionEntry entry = maybeEntry.get();
147 tracker.closeTask(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
149 // We have freed up a slot, try to transmit something
152 return Optional.of(entry);
155 final void tryTransmit(final long now) {
156 final int toSend = canTransmitCount(inflight.size());
157 if (toSend > 0 && !pending.isEmpty()) {
158 transmitEntries(toSend, now);
162 private void transmitEntries(final int maxTransmit, final long now) {
163 for (int i = 0; i < maxTransmit; ++i) {
164 final ConnectionEntry e = pending.poll();
166 LOG.debug("Queue {} transmitted {} requests", this, i);
170 transmitEntry(e, now);
173 LOG.debug("Queue {} transmitted {} requests", this, maxTransmit);
176 private void transmitEntry(final ConnectionEntry entry, final long now) {
177 LOG.debug("Queue {} transmitting entry {}", entry);
178 // We are not thread-safe and are supposed to be externally-guarded,
179 // hence send-before-record should be fine.
180 // This needs to be revisited if the external guards are lowered.
181 inflight.addLast(transmit(entry, now));
185 * Enqueue an entry, possibly also transmitting it.
187 * @return Delay to be forced on the calling thread, in nanoseconds.
189 final long enqueue(final ConnectionEntry entry, final long now) {
190 if (successor != null) {
191 successor.forwardEntry(entry, now);
195 // XXX: we should place a guard against incorrect entry sequences:
196 // entry.getEnqueueTicks() should have non-negative difference from the last entry present in the queues
198 // Reserve an entry before we do anything that can fail
199 final long delay = tracker.openTask(now);
202 * This is defensive to make sure we do not do the wrong thing here and reorder messages if we ever happen
203 * to have available send slots and non-empty pending queue.
205 final int toSend = canTransmitCount(inflight.size());
207 LOG.trace("Queue is at capacity, delayed sending of request {}", entry.getRequest());
208 pending.addLast(entry);
212 if (pending.isEmpty()) {
213 transmitEntry(entry, now);
217 pending.addLast(entry);
218 transmitEntries(toSend, now);
223 * Return the number of entries which can be transmitted assuming the supplied in-flight queue size.
225 abstract int canTransmitCount(int inflightSize);
227 abstract TransmittedConnectionEntry transmit(ConnectionEntry entry, long now);
229 final boolean isEmpty() {
230 return inflight.isEmpty() && pending.isEmpty();
233 final ConnectionEntry peek() {
234 final ConnectionEntry ret = inflight.peek();
239 return pending.peek();
242 final void poison(final RequestException cause) {
243 poisonQueue(inflight, cause);
244 poisonQueue(pending, cause);
247 final void setForwarder(final ReconnectForwarder forwarder, final long now) {
248 Verify.verify(successor == null, "Successor {} already set on connection {}", successor, this);
249 successor = Preconditions.checkNotNull(forwarder);
250 LOG.debug("Connection {} superseded by {}, splicing queue", this, successor);
253 * We need to account for entries which have been added between the time drain() was called and this method
254 * is invoked. Since the old connection is visible during replay and some entries may have completed on the
255 * replay thread, there was an avenue for this to happen.
258 ConnectionEntry entry = inflight.poll();
259 while (entry != null) {
260 successor.forwardEntry(entry, now);
261 entry = inflight.poll();
265 entry = pending.poll();
266 while (entry != null) {
267 successor.forwardEntry(entry, now);
268 entry = pending.poll();
272 LOG.debug("Connection {} queue spliced {} messages", this, count);
275 final void remove(final long now) {
276 final TransmittedConnectionEntry txe = inflight.poll();
278 final ConnectionEntry entry = pending.pop();
279 tracker.closeTask(now, entry.getEnqueuedTicks(), 0, 0);
281 tracker.closeTask(now, txe.getEnqueuedTicks(), txe.getTxTicks(), 0);
286 Deque<TransmittedConnectionEntry> getInflight() {
291 Deque<ConnectionEntry> getPending() {
296 * We are using tri-state return here to indicate one of three conditions:
297 * - if a matching entry is found, return an Optional containing it
298 * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null
299 * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional
301 @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL",
302 justification = "Returning null Optional is documented in the API contract.")
303 private static Optional<TransmittedConnectionEntry> findMatchingEntry(final Queue<? extends ConnectionEntry> queue,
304 final ResponseEnvelope<?> envelope) {
305 // Try to find the request in a queue. Responses may legally come back in a different order, hence we need
306 // to use an iterator
307 final Iterator<? extends ConnectionEntry> it = queue.iterator();
308 while (it.hasNext()) {
309 final ConnectionEntry e = it.next();
310 final Request<?, ?> request = e.getRequest();
311 final Response<?, ?> response = envelope.getMessage();
313 // First check for matching target, or move to next entry
314 if (!request.getTarget().equals(response.getTarget())) {
318 // Sanity-check logical sequence, ignore any out-of-order messages
319 if (request.getSequence() != response.getSequence()) {
320 LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope);
321 return Optional.empty();
324 // Check if the entry has (ever) been transmitted
325 if (!(e instanceof TransmittedConnectionEntry)) {
326 return Optional.empty();
329 final TransmittedConnectionEntry te = (TransmittedConnectionEntry) e;
331 // Now check session match
332 if (envelope.getSessionId() != te.getSessionId()) {
333 LOG.debug("Expecting session {}, ignoring response {}", te.getSessionId(), envelope);
334 return Optional.empty();
336 if (envelope.getTxSequence() != te.getTxSequence()) {
337 LOG.warn("Expecting txSequence {}, ignoring response {}", te.getTxSequence(), envelope);
338 return Optional.empty();
341 LOG.debug("Completing request {} with {}", request, envelope);
343 return Optional.of(te);
349 private static void poisonQueue(final Queue<? extends ConnectionEntry> queue, final RequestException cause) {
350 for (ConnectionEntry e : queue) {
351 final Request<?, ?> request = e.getRequest();
352 LOG.trace("Poisoning request {}", request, cause);
353 e.complete(request.toRequestFailure(cause));