2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.access.client;
10 import akka.actor.ActorRef;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Verify;
13 import com.google.common.collect.Iterables;
14 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
15 import java.util.ArrayDeque;
16 import java.util.Iterator;
17 import java.util.Optional;
18 import java.util.Queue;
19 import javax.annotation.concurrent.NotThreadSafe;
20 import org.opendaylight.controller.cluster.access.concepts.Request;
21 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
22 import org.opendaylight.controller.cluster.access.concepts.RequestException;
23 import org.opendaylight.controller.cluster.access.concepts.Response;
24 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
29 * This queue is internally split into two queues for performance reasons, both memory efficiency and copy
33 * Entries are always appended to the end, but then they are transmitted to the remote end and do not necessarily
34 * complete in the order in which they were sent -- hence the head of the queue does not increase linearly,
35 * but can involve spurious removals of non-head entries.
38 * For memory efficiency we want to pre-allocate both queues -- which points to ArrayDeque, but that is very
39 * inefficient when entries are removed from the middle. In the typical case we expect the number of in-flight
40 * entries to be an order of magnitude lower than the number of enqueued entries, hence the split.
43 * Note that in transient case of reconnect, when the backend gives us a lower number of maximum in-flight entries
44 * than the previous incarnation, we may end up still moving the pending queue -- but that is a very exceptional
45 * scenario, hence we consciously ignore it to keep the design relatively simple.
48 * This class is not thread-safe, as it is expected to be guarded by {@link AbstractClientConnection}.
50 * @author Robert Varga
53 abstract class TransmitQueue {
54 static final class Halted extends TransmitQueue {
56 int canTransmitCount(final int inflightSize) {
61 TransmittedConnectionEntry transmit(final ConnectionEntry entry, final long now) {
62 throw new UnsupportedOperationException("Attempted to transmit on a halted queue");
66 static final class Transmitting extends TransmitQueue {
67 private final BackendInfo backend;
68 private long nextTxSequence;
70 Transmitting(final BackendInfo backend) {
71 this.backend = Preconditions.checkNotNull(backend);
75 int canTransmitCount(final int inflightSize) {
76 return backend.getMaxMessages() - inflightSize;
80 TransmittedConnectionEntry transmit(final ConnectionEntry entry, final long now) {
81 final RequestEnvelope env = new RequestEnvelope(entry.getRequest().toVersion(backend.getVersion()),
82 backend.getSessionId(), nextTxSequence++);
84 final TransmittedConnectionEntry ret = new TransmittedConnectionEntry(entry, env.getSessionId(),
85 env.getTxSequence(), now);
86 backend.getActor().tell(env, ActorRef.noSender());
91 private static final Logger LOG = LoggerFactory.getLogger(TransmitQueue.class);
93 private final ArrayDeque<TransmittedConnectionEntry> inflight = new ArrayDeque<>();
94 private final ArrayDeque<ConnectionEntry> pending = new ArrayDeque<>();
96 private ReconnectForwarder successor;
98 final Iterable<ConnectionEntry> asIterable() {
99 return Iterables.concat(inflight, pending);
102 private void recordCompletion(final long now, final long enqueuedTicks, final long transmitTicks,
103 final long execNanos) {
107 final void complete(final ResponseEnvelope<?> envelope, final long now) {
108 Optional<TransmittedConnectionEntry> maybeEntry = findMatchingEntry(inflight, envelope);
109 if (maybeEntry == null) {
110 LOG.debug("Request for {} not found in inflight queue, checking pending queue", envelope);
111 maybeEntry = findMatchingEntry(pending, envelope);
114 if (maybeEntry == null || !maybeEntry.isPresent()) {
115 LOG.warn("No request matching {} found, ignoring response", envelope);
119 final TransmittedConnectionEntry entry = maybeEntry.get();
120 LOG.debug("Completing {} with {}", entry, envelope);
121 entry.complete(envelope.getMessage());
123 recordCompletion(now, entry.getEnqueuedTicks(), entry.getTxTicks(), envelope.getExecutionTimeNanos());
125 // We have freed up a slot, try to transmit something
126 int toSend = canTransmitCount(inflight.size());
128 final ConnectionEntry e = pending.poll();
133 LOG.debug("Transmitting entry {}", e);
139 final void enqueue(final ConnectionEntry entry, final long now) {
140 if (successor != null) {
141 successor.forwardEntry(entry, now);
145 if (canTransmitCount(inflight.size()) <= 0) {
146 LOG.trace("Queue is at capacity, delayed sending of request {}", entry.getRequest());
151 // We are not thread-safe and are supposed to be externally-guarded, hence send-before-record should be fine.
152 // This needs to be revisited if the external guards are lowered.
153 inflight.offer(transmit(entry, now));
154 LOG.debug("Sent request {} on queue {}", entry.getRequest(), this);
157 abstract int canTransmitCount(int inflightSize);
159 abstract TransmittedConnectionEntry transmit(ConnectionEntry entry, long now);
161 final boolean isEmpty() {
162 return inflight.isEmpty() && pending.isEmpty();
165 final ConnectionEntry peek() {
166 final ConnectionEntry ret = inflight.peek();
171 return pending.peek();
174 final void poison(final RequestException cause) {
175 poisonQueue(inflight, cause);
176 poisonQueue(pending, cause);
179 final void setForwarder(final ReconnectForwarder forwarder, final long now) {
180 Verify.verify(successor == null, "Successor {} already set on connection {}", successor, this);
181 successor = Preconditions.checkNotNull(forwarder);
182 LOG.debug("Connection {} superseded by {}, splicing queue", this, successor);
184 ConnectionEntry entry = inflight.poll();
185 while (entry != null) {
186 successor.forwardEntry(entry, now);
187 entry = inflight.poll();
190 entry = pending.poll();
191 while (entry != null) {
192 successor.forwardEntry(entry, now);
193 entry = pending.poll();
198 * We are using tri-state return here to indicate one of three conditions:
199 * - if a matching entry is found, return an Optional containing it
200 * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null
201 * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional
203 @SuppressFBWarnings(value = "NP_OPTIONAL_RETURN_NULL",
204 justification = "Returning null Optional is documented in the API contract.")
205 private static Optional<TransmittedConnectionEntry> findMatchingEntry(final Queue<? extends ConnectionEntry> queue,
206 final ResponseEnvelope<?> envelope) {
207 // Try to find the request in a queue. Responses may legally come back in a different order, hence we need
208 // to use an iterator
209 final Iterator<? extends ConnectionEntry> it = queue.iterator();
210 while (it.hasNext()) {
211 final ConnectionEntry e = it.next();
212 final Request<?, ?> request = e.getRequest();
213 final Response<?, ?> response = envelope.getMessage();
215 // First check for matching target, or move to next entry
216 if (!request.getTarget().equals(response.getTarget())) {
220 // Sanity-check logical sequence, ignore any out-of-order messages
221 if (request.getSequence() != response.getSequence()) {
222 LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope);
223 return Optional.empty();
226 // Check if the entry has (ever) been transmitted
227 if (!(e instanceof TransmittedConnectionEntry)) {
228 return Optional.empty();
231 final TransmittedConnectionEntry te = (TransmittedConnectionEntry) e;
233 // Now check session match
234 if (envelope.getSessionId() != te.getSessionId()) {
235 LOG.debug("Expecting session {}, ignoring response {}", te.getSessionId(), envelope);
236 return Optional.empty();
238 if (envelope.getTxSequence() != te.getTxSequence()) {
239 LOG.warn("Expecting txSequence {}, ignoring response {}", te.getTxSequence(), envelope);
240 return Optional.empty();
243 LOG.debug("Completing request {} with {}", request, envelope);
245 return Optional.of(te);
251 private static void poisonQueue(final Queue<? extends ConnectionEntry> queue, final RequestException cause) {
252 for (ConnectionEntry e : queue) {
253 final Request<?, ?> request = e.getRequest();
254 LOG.trace("Poisoning request {}", request, cause);
255 e.complete(request.toRequestFailure(cause));