2 * Copyright (c) 2016, 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.access.client;
11 import com.google.common.base.Preconditions;
12 import javax.annotation.concurrent.NotThreadSafe;
13 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory;
17 * Base class for tracking throughput and computing delays when processing stream of tasks.
19 * <p>The idea is to improve throughput in a typical request-response scenario.
20 * Multiple "user" threads are submitting requests to a "frontend". The frontend does some
21 * pre-processing and then sends requests to (usually one) "backend". The backend does the main work
22 * and replies to the frontend, which reports to the the corresponding user.
23 * In terms of task processing, user threads are "opening" tasks and frontend is "closing" them.
24 * Latency of the backend may fluctuate wildly. To avoid backend running out of open tasks,
25 * frontend should maintain a queue of requests for users to submit tasks to.
26 * In order to avoid excessive memory consumption, there should be a back-pressure mechanism
27 * which blocks the user (submit) threads for appropriate durations.
28 * Users can tolerate moderately delayed responses, but they only tolerate small block (submit)
31 * <p>An ideal back-pressure algorithm would keep the queue reasonably full,
32 * while fairly delaying the user threads. In other words, backend idle time should be low,
33 * as well as user block time dispersion
34 * (as opposed to block time average, which is dictated by overall performance).
36 * <p>In order for an algorithm to compute reasonable wait times,
37 * various inputs can be useful, mostly related to timing of various stages of task processing.
38 * Methods of this class assume "enqueue and wait" usage, submit thread is supposed to block itself
39 * when asked to. The delay computation is pessimistic, it expects each participating thread
40 * to enqueue another task as soon as its delay time allows.
42 * <p>This class is to be used by single frontend. This class is not thread safe,
43 * the frontend is responsible for guarding against conflicting access.
44 * Time is measured in ticks (nanoseconds), methods never look at current time,
45 * relying on {@code now} argument where appropriate.
46 * This means the sequence of {@code now} argument values is expected to be non-decreasing.
48 * <p>Input data used for tracking is tightly coupled with TransitQueue#recordCompletion arguments.
50 * @author Vratko Polak
52 // TODO: Would bulk methods be less taxing than a loop of single task calls?
54 abstract class ProgressTracker {
55 private static final Logger LOG = LoggerFactory.getLogger(ProgressTracker.class);
58 * When no tasks has been closed yet, this will be used to estimate throughput.
60 private final long defaultTicksPerTask;
63 * Number of tasks closed so far.
65 private long tasksClosed = 0;
68 * Number of tasks so far, both open and closed.
70 private long tasksEncountered = 0;
73 * The most recent tick number when the number of open tasks has become non-positive.
75 private long lastIdle = Long.MIN_VALUE;
78 * The most recent tick number when a task has been closed.
80 private long lastClosed = Long.MIN_VALUE;
83 * Tick number when the farthest known wait time is over.
85 private long nearestAllowed = Long.MIN_VALUE;
88 * Number of ticks elapsed before lastIdle while there was at least one open task.
90 private long elapsedBeforeIdle = 0L;
95 * Construct an idle tracker with specified ticks per task value to use as default.
97 * @param ticksPerTask value to use as default
99 ProgressTracker(final long ticksPerTask) {
100 Preconditions.checkArgument(ticksPerTask >= 0);
101 defaultTicksPerTask = ticksPerTask;
105 * Construct a new tracker suitable for a new task queue related to a "reconnect".
107 * <p>When reconnecting to a new backend, tasks may need to be re-processed by the frontend,
108 * possibly resulting in a different number of tasks.
109 * Also, performance of the new backend can be different, but the perforance of the previous backend
110 * is generally still better estimate than defaults of a brand new tracker.
112 * <p>This "inheritance constructor" creates a new tracker with no open tasks (thus initially idle),
113 * but other internal values should lead to a balanced performance
114 * after tasks opened in the source tracker are "replayed" into the new tracker.
116 * <p>In particular, this impementation keeps the number of closed tasks the same,
117 * and makes it so ticksWorkedPerClosedTask is initially the same as in the old tracker.
119 * @param oldTracker the tracker used for the previously used backend
120 * @param now tick number corresponding to caller's present
122 ProgressTracker(final ProgressTracker oldTracker, final long now) {
123 this.defaultTicksPerTask = oldTracker.defaultTicksPerTask;
124 this.tasksEncountered = this.tasksClosed = oldTracker.tasksClosed;
125 this.lastClosed = oldTracker.lastClosed;
126 this.nearestAllowed = oldTracker.nearestAllowed; // Call cancelDebt explicitly if needed.
127 this.lastIdle = oldTracker.lastIdle;
128 this.elapsedBeforeIdle = oldTracker.elapsedBeforeIdle;
129 if (!oldTracker.isIdle()) {
134 // "Public" shared access (read-only) accessor-like methods
137 * Get number of tasks closed so far.
139 * @return number of tasks known to be finished already; the value never decreases
141 final long tasksClosed() {
146 * Get umber of tasks so far, both open and closed.
148 * @return number of tasks encountered so far, open or finished; the value never decreases
150 final long tasksEncountered() {
151 return tasksEncountered;
155 * Get number of tasks currently open.
157 * @return number of tasks started but not finished yet
159 final long tasksOpen() { // TODO: Should we return int?
160 // TODO: Should we check the return value is non-negative?
161 return tasksEncountered - tasksClosed;
165 * When idle, there are no open tasks so no progress is made.
167 * @return {@code true} if every encountered task is already closed, {@code false} otherwise
169 final boolean isIdle() {
170 return tasksClosed >= tasksEncountered;
174 * Number of ticks elapsed (before now) since the last closed task while there was at least one open task.
176 * @param now tick number corresponding to caller's present
177 * @return number of ticks backend is neither idle nor responding
179 final long ticksStalling(final long now) {
180 return isIdle() ? 0 : Math.max(now, lastClosed) - lastClosed;
183 // Read only protected methods.
186 * Get the value of default ticks per task this instance was created to use.
188 * @return default ticks per task value
190 protected final long defaultTicksPerTask() {
191 return defaultTicksPerTask;
195 * One task is roughly estimated to take this long to close.
197 * @param now tick number corresponding to caller's present
198 * @return total ticks worked divided by closed tasks, or the default value if no closed tasks
200 protected final double ticksWorkedPerClosedTask(final long now) {
201 if (tasksClosed < 1) {
202 return defaultTicksPerTask;
204 return (double) ticksWorked(now) / tasksClosed;
207 // Read only private methods.
210 * Number of ticks elapsed (before now) while there was at least one open task.
212 * @param now tick number corresponding to caller's present
213 * @return number of ticks there was at least one task open
215 private long ticksWorked(final long now) {
216 return isIdle() ? elapsedBeforeIdle : Math.max(now, lastIdle) - lastIdle + elapsedBeforeIdle;
220 * Give an estimate of a tick number when there will be no accumulated delays.
222 * <p>The delays accumulated include one more open task.
223 * Basically, the return value corresponds to openTask() return value,
224 * but this gives an absolute time, instead of delay relative to now.
226 * @param now tick number corresponding to caller's present
227 * @return estimated tick number when all threads with opened tasks are done waiting
229 private long estimateAllowed(final long now) {
230 return Math.max(now, nearestAllowed + estimateIsolatedDelay(now));
233 // State-altering "public" methods.
236 * Track a task is being closed.
238 * @param now tick number corresponding to caller's present
239 * @param enqueuedTicks see TransitQueue#recordCompletion
240 * @param transmitTicks see TransitQueue#recordCompletion
241 * @param execNanos see TransitQueue#recordCompletion
243 final void closeTask(final long now, final long enqueuedTicks, final long transmitTicks, final long execNanos) {
245 LOG.info("Attempted to close a task while no tasks are open");
247 unsafeCloseTask(now, enqueuedTicks, transmitTicks, execNanos);
252 * Track a task that is being opened.
254 * @param now tick number corresponding to caller's present
255 * @return number of ticks (nanos) the caller thread should wait before opening another task
257 final long openTask(final long now) {
258 openTaskWithoutThrottle(now);
259 return reserveDelay(now);
263 * Set nearestAllowed value to now.
265 * <p>This is useful when new a backend has just connected,
266 * after a period of no working backend present.
267 * The accumulated delays should not limit future tasks.
268 * The queue fullness and the averaged backend performance are kept,
269 * as they should result in good enough estimations for new tasks.
271 * @param now tick number corresponding to caller's present
273 final void cancelDebt(final long now) {
274 nearestAllowed = now;
277 // Private state-altering methods.
280 * Compute the next delay and update nearestAllowed value accordingly.
282 * @param now tick number corresponding to caller's present
283 * @return number of ticks (nanos) the caller thread should wait before opening another task
285 private long reserveDelay(final long now) {
286 nearestAllowed = estimateAllowed(now);
287 return nearestAllowed - now;
291 * Track a task is being closed.
293 * <p>This method does not verify there was any task open.
294 * This call can empty the collection of open tasks, that special case should be handled.
296 * @param now tick number corresponding to caller's present
297 * @param enqueuedTicks see TransmitQueue#recordCompletion
298 * @param transmitTicks see TransmitQueue#recordCompletion
299 * @param execNanos see TransmitQueue#recordCompletion
301 private void unsafeCloseTask(final long now, final long enqueuedTicks, final long transmitTicks,
302 final long execNanos) {
311 * Track a task is being opened.
313 * <p>This method does not aggregate delays, allowing the caller to sidestep the throttling.
314 * This call can make the collection of open tasks non-empty, that special case should be handled.
316 * @param now tick number corresponding to caller's present
318 private void openTaskWithoutThrottle(final long now) {
320 transitFromIdle(now);
326 * Update lastIdle as a new "last" just hapened.
328 private void transitFromIdle(final long now) {
329 lastIdle = Math.max(now, lastIdle);
333 * Update elapsedBeforeIdle as the "before" has jast moved.
335 private void transitToIdle(final long now) {
336 elapsedBeforeIdle += Math.max(0, now - lastIdle);
339 // Protected abstract read-only methods.
342 * Give an estimate of a fair delay, assuming delays caused by other opened tasks are ignored.
344 * @param now tick number corresponding to caller's present
345 * @return delay (in ticks) after which another openTask() would be fair to be called by the same thread again
347 protected abstract long estimateIsolatedDelay(long now);