2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.access.client;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Ticker;
13 import com.google.common.base.Verify;
14 import java.util.ArrayDeque;
15 import java.util.Iterator;
16 import java.util.Optional;
17 import java.util.Queue;
18 import java.util.concurrent.CompletionStage;
19 import java.util.concurrent.TimeUnit;
20 import javax.annotation.Nullable;
21 import javax.annotation.concurrent.NotThreadSafe;
22 import org.opendaylight.controller.cluster.access.concepts.Request;
23 import org.opendaylight.controller.cluster.access.concepts.RequestException;
24 import org.opendaylight.controller.cluster.access.concepts.Response;
25 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import scala.concurrent.duration.FiniteDuration;
31 * TODO: make this class and its users thread-safe. This will require some atomic state-keeping so that timeouts,
32 * retries and enqueues work as expected.
35 final class SequencedQueue {
36 private static final Logger LOG = LoggerFactory.getLogger(SequencedQueue.class);
38 // Keep these constant in nanoseconds, as that prevents unnecessary conversions in the fast path
40 static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15);
42 static final long REQUEST_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30);
43 private static final FiniteDuration INITIAL_REQUEST_TIMEOUT = FiniteDuration.apply(REQUEST_TIMEOUT_NANOS,
44 TimeUnit.NANOSECONDS);
47 * Default number of permits we start with. This value is used when we start up only, once we resolve a backend
48 * we will use its advertized {@link BackendInfo#getMaxMessages()} forever, refreshing the value on each successful
51 private static final int DEFAULT_TX_LIMIT = 1000;
53 private final Ticker ticker;
54 private final Long cookie;
57 * We need to keep the sequence of operations towards the backend and rate-limit what we send out, possibly dealing
58 * with the limit changing between reconnects (which imply retransmission).
60 * We keep three queues: one for requests that have been sent to the last known backend (until we have a new one),
61 * one for requests that have been sent to the previous backend (and have not been transmitted to the current one),
62 * and one for requests which have not been transmitted at all.
64 * When transmitting we first try to drain the second queue and service the third one only when that becomes empty.
65 * When receiving, we look at the first two -- as the response has to match a transmitted request. Since responses
66 * can get re-ordered, we may end up receiving responses to previously-sent requests before we have a chance
67 * to retransmit -- hence the second queue.
69 private Queue<SequencedQueueEntry> currentInflight = new ArrayDeque<>();
70 private Queue<SequencedQueueEntry> lastInflight = new ArrayDeque<>();
71 private final Queue<SequencedQueueEntry> pending = new ArrayDeque<>();
74 * Last scheduled resolution request. We do not use this object aside from requiring it as a proof that when
75 * resolution occurs via {@link #setBackendInfo(CompletionStage, BackendInfo)}, we only update the last requested
78 private CompletionStage<? extends BackendInfo> backendProof;
79 private BackendInfo backend;
81 // This is not final because we need to be able to replace it.
82 private long txSequence;
84 private int lastTxLimit = DEFAULT_TX_LIMIT;
87 * Last scheduled timer. We use this to prevent multiple timers from being scheduled for this queue.
89 private Object expectingTimer;
91 private long lastProgress;
93 // Updated from application thread
94 private volatile boolean notClosed = true;
96 SequencedQueue(final Long cookie, final Ticker ticker) {
97 this.cookie = Preconditions.checkNotNull(cookie);
98 this.ticker = Preconditions.checkNotNull(ticker);
99 lastProgress = ticker.read();
106 private void checkNotClosed() {
107 Preconditions.checkState(notClosed, "Queue %s is closed", this);
110 private long nextTxSequence() {
115 * Enqueue, and possibly transmit a request. Results of this method are tri-state, indicating to the caller
116 * the following scenarios:
117 * 1) The request has been enqueued and transmitted. No further actions are necessary
118 * 2) The request has been enqueued and transmitted, but the caller needs to schedule a new timer
119 * 3) The request has been enqueued, but the caller needs to request resolution of backend information and that
120 * process needs to complete before transmission occurs
122 * These options are covered via returning an {@link Optional}. The caller needs to examine it and decode
123 * the scenarios above according to the following rules:
124 * - if is null, the first case applies
125 * - if {@link Optional#isPresent()} returns false, the third case applies and the caller should initiate backend
126 * resolution and eventually call {@link #setBackendInfo(CompletionStage, BackendInfo)}
127 * - if {@link Optional#isPresent()} returns true, the second case applies and the caller MUST schedule a timer
129 * @param request Request to be sent
130 * @param callback Callback to be invoked
131 * @return Optional duration with semantics described above.
133 @Nullable Optional<FiniteDuration> enqueueRequest(final Request<?, ?> request, final RequestCallback callback) {
136 final long now = ticker.read();
137 final SequencedQueueEntry e = new SequencedQueueEntry(request, callback, now);
138 if (backend == null) {
139 LOG.debug("No backend available, request resolution");
141 return Optional.empty();
143 if (!lastInflight.isEmpty()) {
144 LOG.debug("Retransmit not yet complete, delaying request {}", request);
148 if (currentInflight.size() >= lastTxLimit) {
149 LOG.debug("Queue is at capacity, delayed sending of request {}", request);
155 currentInflight.offer(e);
156 LOG.debug("Enqueued request {} to queue {}", request, this);
158 e.retransmit(backend, nextTxSequence(), now);
159 if (expectingTimer == null) {
160 expectingTimer = now + REQUEST_TIMEOUT_NANOS;
161 return Optional.of(INITIAL_REQUEST_TIMEOUT);
168 * We are using tri-state return here to indicate one of three conditions:
169 * - if a matching entry is found, return an Optional containing it
170 * - if a matching entry is not found, but it makes sense to keep looking at other queues, return null
171 * - if a conflicting entry is encountered, indicating we should ignore this request, return an empty Optional
173 private static Optional<SequencedQueueEntry> findMatchingEntry(final Queue<SequencedQueueEntry> queue,
174 final ResponseEnvelope<?> envelope) {
175 // Try to find the request in a queue. Responses may legally come back in a different order, hence we need
176 // to use an iterator
177 final Iterator<SequencedQueueEntry> it = queue.iterator();
178 while (it.hasNext()) {
179 final SequencedQueueEntry e = it.next();
180 final TxDetails txDetails = Verify.verifyNotNull(e.getTxDetails());
182 final Request<?, ?> request = e.getRequest();
183 final Response<?, ?> response = envelope.getMessage();
185 // First check for matching target, or move to next entry
186 if (!request.getTarget().equals(response.getTarget())) {
190 // Sanity-check logical sequence, ignore any out-of-order messages
191 if (request.getSequence() != response.getSequence()) {
192 LOG.debug("Expecting sequence {}, ignoring response {}", request.getSequence(), envelope);
193 return Optional.empty();
196 // Now check session match
197 if (envelope.getSessionId() != txDetails.getSessionId()) {
198 LOG.debug("Expecting session {}, ignoring response {}", txDetails.getSessionId(), envelope);
199 return Optional.empty();
201 if (envelope.getTxSequence() != txDetails.getTxSequence()) {
202 LOG.warn("Expecting txSequence {}, ignoring response {}", txDetails.getTxSequence(), envelope);
203 return Optional.empty();
206 LOG.debug("Completing request {} with {}", request, envelope);
208 return Optional.of(e);
214 ClientActorBehavior complete(final ClientActorBehavior current, final ResponseEnvelope<?> envelope) {
215 Optional<SequencedQueueEntry> maybeEntry = findMatchingEntry(currentInflight, envelope);
216 if (maybeEntry == null) {
217 maybeEntry = findMatchingEntry(lastInflight, envelope);
220 if (maybeEntry == null || !maybeEntry.isPresent()) {
221 LOG.warn("No request matching {} found, ignoring response", envelope);
225 lastProgress = ticker.read();
226 final ClientActorBehavior ret = maybeEntry.get().complete(envelope.getMessage());
228 // We have freed up a slot, try to transmit something
229 if (backend != null) {
230 final int toSend = lastTxLimit - currentInflight.size();
239 private int transmitEntries(final Queue<SequencedQueueEntry> queue, final int count) {
243 final SequencedQueueEntry e = queue.poll();
248 LOG.debug("Transmitting entry {}", e);
249 e.retransmit(backend, nextTxSequence(), lastProgress);
256 private void runTransmit(final int count) {
259 // Process lastInflight first, possibly clearing it
260 if (!lastInflight.isEmpty()) {
261 toSend = transmitEntries(lastInflight, count);
262 if (lastInflight.isEmpty()) {
263 // We won't be needing the queue anymore, change it to specialized implementation
264 lastInflight = EmptyQueue.getInstance();
270 // Process pending next.
271 transmitEntries(pending, toSend);
274 Optional<FiniteDuration> setBackendInfo(final CompletionStage<? extends BackendInfo> proof, final BackendInfo backend) {
275 Preconditions.checkNotNull(backend);
276 if (!proof.equals(backendProof)) {
277 LOG.debug("Ignoring resolution {} while waiting for {}", proof, this.backendProof);
278 return Optional.empty();
281 LOG.debug("Resolved backend {}", backend);
283 // We are un-blocking transmission, but we need to juggle the queues first to get retransmit order right
284 // and also not to exceed new limits
285 final Queue<SequencedQueueEntry> newLast = new ArrayDeque<>(currentInflight.size() + lastInflight.size());
286 newLast.addAll(currentInflight);
287 newLast.addAll(lastInflight);
288 lastInflight = newLast.isEmpty() ? EmptyQueue.getInstance() : newLast;
290 // Clear currentInflight, possibly compacting it
291 final int txLimit = backend.getMaxMessages();
292 if (lastTxLimit > txLimit) {
293 currentInflight = new ArrayDeque<>();
295 currentInflight.clear();
298 // We are ready to roll
299 this.backend = backend;
302 lastTxLimit = txLimit;
303 lastProgress = ticker.read();
305 // No pending requests, return
306 if (lastInflight.isEmpty() && pending.isEmpty()) {
307 return Optional.empty();
310 LOG.debug("Sending up to {} requests to backend {}", txLimit, backend);
312 runTransmit(lastTxLimit);
314 // Calculate next timer if necessary
315 if (expectingTimer == null) {
316 // Request transmission may have cost us some time. Recalculate timeout.
317 final long nextTicks = ticker.read() + REQUEST_TIMEOUT_NANOS;
318 expectingTimer = nextTicks;
319 return Optional.of(FiniteDuration.apply(nextTicks - lastProgress, TimeUnit.NANOSECONDS));
321 return Optional.empty();
325 boolean expectProof(final CompletionStage<? extends BackendInfo> proof) {
326 if (!proof.equals(backendProof)) {
327 LOG.debug("Setting resolution handle to {}", proof);
328 backendProof = proof;
331 LOG.trace("Already resolving handle {}", proof);
336 boolean hasCompleted() {
337 return !notClosed && currentInflight.isEmpty() && lastInflight.isEmpty() && pending.isEmpty();
341 * Check queue timeouts and return true if a timeout has occured.
343 * @return True if a timeout occured
344 * @throws NoProgressException if the queue failed to make progress for an extended
347 boolean runTimeout() throws NoProgressException {
348 expectingTimer = null;
349 final long now = ticker.read();
351 if (!currentInflight.isEmpty() || !lastInflight.isEmpty() || !pending.isEmpty()) {
352 final long ticksSinceProgress = now - lastProgress;
353 if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
354 LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
355 TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
357 final NoProgressException ex = new NoProgressException(ticksSinceProgress);
363 // We always schedule requests in sequence, hence any timeouts really just mean checking the head of the queue
364 final SequencedQueueEntry head = currentInflight.peek();
365 if (head != null && head.isTimedOut(now, REQUEST_TIMEOUT_NANOS)) {
367 LOG.debug("Queue {} invalidated backend info", this);
374 private static void poisonQueue(final Queue<SequencedQueueEntry> queue, final RequestException cause) {
375 queue.forEach(e -> e.poison(cause));
379 void poison(final RequestException cause) {
382 poisonQueue(currentInflight, cause);
383 poisonQueue(lastInflight, cause);
384 poisonQueue(pending, cause);
387 // FIXME: add a caller from ClientSingleTransaction