2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.actors.client;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Ticker;
13 import java.util.Deque;
14 import java.util.Iterator;
15 import java.util.LinkedList;
16 import java.util.Optional;
17 import java.util.concurrent.CompletionStage;
18 import java.util.concurrent.TimeUnit;
19 import javax.annotation.Nullable;
20 import javax.annotation.concurrent.NotThreadSafe;
21 import org.opendaylight.controller.cluster.access.concepts.Request;
22 import org.opendaylight.controller.cluster.access.concepts.RequestException;
23 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26 import scala.concurrent.duration.FiniteDuration;
29 * TODO: make this class and its users thread-safe. This will require some atomic state-keeping so that timeouts,
30 * retries and enqueues work as expected.
33 final class SequencedQueue {
34 private static final Logger LOG = LoggerFactory.getLogger(SequencedQueue.class);
36 // Keep these constant in nanoseconds, as that prevents unnecessary conversions in the fast path
38 static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15);
40 static final long REQUEST_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30);
41 private static final FiniteDuration INITIAL_REQUEST_TIMEOUT = FiniteDuration.apply(REQUEST_TIMEOUT_NANOS,
42 TimeUnit.NANOSECONDS);
45 * We need to keep the sequence of operations towards the backend, hence we use a queue. Since targets can
46 * progress at different speeds, these may be completed out of order.
48 * TODO: The combination of target and sequence uniquely identifies a particular request, we will need to
49 * figure out a more efficient lookup mechanism to deal with responses which do not match the queue
52 private final Deque<SequencedQueueEntry> queue = new LinkedList<>();
53 private final Ticker ticker;
54 private final Long cookie;
56 // Updated/consulted from actor context only
58 * Last scheduled resolution request. We do not use this object aside from requiring it as a proof that when
59 * resolution occurs via {@link #setBackendInfo(CompletionStage, BackendInfo)}, we only update the last requested
62 private CompletionStage<? extends BackendInfo> backendProof;
63 private BackendInfo backend;
66 * Last scheduled timer. We use this to prevent multiple timers from being scheduled for this queue.
68 private Object expectingTimer;
70 private long lastProgress;
72 // Updated from application thread
73 private volatile boolean notClosed = true;
75 SequencedQueue(final Long cookie, final Ticker ticker) {
76 this.cookie = Preconditions.checkNotNull(cookie);
77 this.ticker = Preconditions.checkNotNull(ticker);
78 lastProgress = ticker.read();
85 private void checkNotClosed() {
86 Preconditions.checkState(notClosed, "Queue %s is closed", this);
90 * Enqueue, and possibly transmit a request. Results of this method are tri-state, indicating to the caller
91 * the following scenarios:
92 * 1) The request has been enqueued and transmitted. No further actions are necessary
93 * 2) The request has been enqueued and transmitted, but the caller needs to schedule a new timer
94 * 3) The request has been enqueued,but the caller needs to request resolution of backend information and that
95 * process needs to complete before transmission occurs
97 * These options are covered via returning an {@link Optional}. The caller needs to examine it and decode
98 * the scenarios above according to the following rules:
99 * - if is null, the first case applies
100 * - if {@link Optional#isPresent()} returns false, the third case applies and the caller should initiate backend
101 * resolution and eventually call {@link #setBackendInfo(CompletionStage, BackendInfo)}
102 * - if {@link Optional#isPresent()} returns true, the second case applies and the caller MUST schedule a timer
104 * @param request Request to be sent
105 * @param callback Callback to be invoked
106 * @return Optional duration with semantics described above.
108 @Nullable Optional<FiniteDuration> enqueueRequest(final long sequence, final Request<?, ?> request,
109 final RequestCallback callback) {
112 final long now = ticker.read();
113 final SequencedQueueEntry e = new SequencedQueueEntry(request, sequence, callback, now);
116 LOG.debug("Enqueued request {} to queue {}", request, this);
118 if (backend == null) {
119 return Optional.empty();
122 e.retransmit(backend, now);
123 if (expectingTimer == null) {
124 expectingTimer = now + REQUEST_TIMEOUT_NANOS;
125 return Optional.of(INITIAL_REQUEST_TIMEOUT);
131 ClientActorBehavior complete(final ClientActorBehavior current, final ResponseEnvelope<?> response) {
132 // Responses to different targets may arrive out of order, hence we use an iterator
133 final Iterator<SequencedQueueEntry> it = queue.iterator();
134 while (it.hasNext()) {
135 final SequencedQueueEntry e = it.next();
136 if (e.acceptsResponse(response)) {
137 lastProgress = ticker.read();
139 LOG.debug("Completing request {} with {}", e, response);
140 return e.complete(response.getMessage());
144 LOG.debug("No request matching {} found", response);
148 Optional<FiniteDuration> setBackendInfo(final CompletionStage<? extends BackendInfo> proof, final BackendInfo backend) {
149 if (!proof.equals(backendProof)) {
150 LOG.debug("Ignoring resolution {} while waiting for {}", proof, this.backendProof);
151 return Optional.empty();
154 this.backend = Preconditions.checkNotNull(backend);
156 LOG.debug("Resolved backend {}", backend);
158 if (queue.isEmpty()) {
159 // No pending requests, hence no need for a timer
160 return Optional.empty();
163 LOG.debug("Resending requests to backend {}", backend);
164 final long now = ticker.read();
165 for (SequencedQueueEntry e : queue) {
166 e.retransmit(backend, now);
169 if (expectingTimer != null) {
170 // We already have a timer going, no need to schedule a new one
171 return Optional.empty();
174 // Above loop may have cost us some time. Recalculate timeout.
175 final long nextTicks = ticker.read() + REQUEST_TIMEOUT_NANOS;
176 expectingTimer = nextTicks;
177 return Optional.of(FiniteDuration.apply(nextTicks - now, TimeUnit.NANOSECONDS));
180 boolean expectProof(final CompletionStage<? extends BackendInfo> proof) {
181 if (!proof.equals(backendProof)) {
182 LOG.debug("Setting resolution handle to {}", proof);
183 backendProof = proof;
186 LOG.trace("Already resolving handle {}", proof);
191 boolean hasCompleted() {
192 return !notClosed && queue.isEmpty();
196 * Check queue timeouts and return true if a timeout has occured.
198 * @return True if a timeout occured
199 * @throws NoProgressException if the queue failed to make progress for an extended
202 boolean runTimeout() throws NoProgressException {
203 expectingTimer = null;
204 final long now = ticker.read();
206 if (!queue.isEmpty()) {
207 final long ticksSinceProgress = now - lastProgress;
208 if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
209 LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
210 TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
212 final NoProgressException ex = new NoProgressException(ticksSinceProgress);
218 // We always schedule requests in sequence, hence any timeouts really just mean checking the head of the queue
219 final SequencedQueueEntry head = queue.peek();
220 if (head != null && head.isTimedOut(now, REQUEST_TIMEOUT_NANOS)) {
222 LOG.debug("Queue {} invalidated backend info", this);
229 void poison(final RequestException cause) {
232 SequencedQueueEntry e = queue.poll();
239 // FIXME: add a caller from ClientSingleTransaction