15bfa569a22e4e8589e200d998d2588bb52b5471
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / ProgressTracker.java
1 /*
2  * Copyright (c) 2016, 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.access.client;
10
11 import com.google.common.base.Preconditions;
12 import javax.annotation.concurrent.NotThreadSafe;
13 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory;
15
16 /**
17  * Base class for tracking throughput and computing delays when processing stream of tasks.
18  *
19  * <p>The idea is to improve throughput in a typical request-response scenario.
20  * A "frontend" is sending requests towards "backend", backend is sending responses back to fronted.
21  * Both frontend and backend may be realized by multiple Java threads,
22  * so there may be multiple requests not yet responded to.
23  * In terms of taks processing, frontend is "opening" tasks and backend is "closing" them.
24  * Latency of the backend may fluctuate wildly. To avoid backend running out of open tasks,
25  * there should be a queue of requests frontend can add to.
26  * In order to avoid excessive memory consumption, there should be a back-pressure mechanism
27  * which blocks the frontend threads for appropriate durations.
28  * Frontend can tolerate moderately delayed responses, but it only tolerates small block times.
29  *
30  * <p>An ideal back-pressure algorithm would keep the queue reasonably full,
31  * while fairly delaying the frontend threads. In other words, backend idle time should be low,
32  * as well as frontend block time dispersion
33  * (as opposed to block time average, which is dictated by overall performance).
34  *
35  * <p>In order for an algorithm to compute reasonable wait times,
36  * various inputs can be useful, mostly related to timing of various stages of task processing.
37  * Methods of this class assume "enqueue and wait" usage.
38  * The delay computation is pessimistic, it expects each participating thread to enqueue another task
39  * as soon as its delay time allows.
40  *
41  * <p>This class is not thread safe, the callers are responsible for guarding against conflicting access.
42  * Time is measured in ticks (nanos), methods never look at current time, relying on {@code now} argument instead.
43  * This means the sequence of {$code now} argument values is expected to be non-decreasing.
44  *
45  * <p>Input data used for tracking is tightly coupled with TransitQueue#recordCompletion arguments.
46  *
47  * @author Vratko Polak
48  */
49 // TODO: Would bulk methods be less taxing than a loop of single task calls?
50 @NotThreadSafe
51 abstract class ProgressTracker {
52     private static final Logger LOG = LoggerFactory.getLogger(ProgressTracker.class);
53
54     /**
55      * When no tasks has been closed yet, this will be used to estimate throughput.
56      */
57     private final long defaultTicksPerTask;
58
59     /**
60      * Number of tasks closed so far.
61      */
62     private long tasksClosed = 0;
63
64     /**
65      * Number of tasks so far, both open and closed.
66      */
67     private long tasksEncountered = 0;
68
69     /**
70      * The most recent tick number when the number of open tasks has become non-positive.
71      */
72     private long lastIdle = Long.MIN_VALUE;
73
74     /**
75      * The most recent tick number when a task has been closed.
76      */
77     private long lastClosed = Long.MIN_VALUE;
78
79     /**
80      * Tick number when the farthest known wait time is over.
81      */
82     private long nearestAllowed = Long.MIN_VALUE;
83
84     /**
85      * Number of ticks elapsed before lastIdle while there was at least one open task.
86      */
87     private long elapsedBeforeIdle = 0L;
88
89     // Constructors
90
91     /**
92      * Construct an idle tracker with specified ticks per task value to use as default.
93      *
94      * @param ticksPerTask value to use as default
95      */
96     ProgressTracker(final long ticksPerTask) {
97         Preconditions.checkArgument(ticksPerTask >= 0);
98         defaultTicksPerTask = ticksPerTask;
99     }
100
101     /**
102      * Construct a new tracker suitable for a new task queue related to a "reconnect".
103      *
104      * <p>When reconnecting to a new backend, tasks may need to be re-processed by the frontend,
105      * possibly resulting in a different number of tasks.
106      * Also, performance of the new backend can be different, but the perforance of the previous backend
107      * is generally still better estimate than defaults of a brand new tracker.
108      *
109      * <p>This "inheritance constructor" creates a new tracker with no open tasks (thus initially idle),
110      * but other internal values should lead to a balanced performance
111      * after tasks opened in the source tracker are "replayed" into the new tracker.
112      *
113      * <p>In particular, this impementation keeps the number of closed tasks the same,
114      * and makes it so ticksWorkedPerClosedTask is initially the same as in the old tracker.
115      *
116      * @param oldTracker the tracker used for the previously used backend
117      * @param now tick number corresponding to caller's present
118      */
119     ProgressTracker(final ProgressTracker oldTracker, final long now) {
120         this.defaultTicksPerTask = oldTracker.defaultTicksPerTask;
121         this.tasksEncountered = this.tasksClosed = oldTracker.tasksClosed;
122         this.lastClosed = oldTracker.lastClosed;
123         this.nearestAllowed = oldTracker.nearestAllowed;  // Call cancelDebt explicitly if needed.
124         this.lastIdle = oldTracker.lastIdle;
125         this.elapsedBeforeIdle = oldTracker.elapsedBeforeIdle;
126         if (!oldTracker.isIdle()) {
127             transitToIdle(now);
128         }
129     }
130
131     // "Public" shared access (read-only) accessor-like methods
132
133     /**
134      * Get number of tasks closed so far.
135      *
136      * @return number of tasks known to be finished already; the value never decreases
137      */
138     final long tasksClosed() {
139         return tasksClosed;
140     }
141
142     /**
143      * Get umber of tasks so far, both open and closed.
144      *
145      * @return number of tasks encountered so far, open or finished; the value never decreases
146      */
147     final long tasksEncountered() {
148         return tasksEncountered;
149     }
150
151     /**
152      * Get number of tasks currently open.
153      *
154      * @return number of tasks started but not finished yet
155      */
156     final long tasksOpen() {  // TODO: Should we return int?
157         // TODO: Should we check the return value is non-negative?
158         return tasksEncountered - tasksClosed;
159     }
160
161     /**
162      * When idle, there are no open tasks so no progress is made.
163      *
164      * @return {@code true} if every encountered task is already closed, {@code false} otherwise
165      */
166     final boolean isIdle() {
167         return tasksClosed >= tasksEncountered;
168     }
169
170     /**
171      * Number of ticks elapsed (before now) since the last closed task while there was at least one open task.
172      *
173      * @param now tick number corresponding to caller's present
174      * @return number of ticks backend is neither idle nor responding
175      */
176     final long ticksStalling(final long now) {
177         return isIdle() ? 0 : Math.max(now, lastClosed) - lastClosed;
178     }
179
180     // Read only protected methods.
181
182     /**
183      * Get the value of default ticks per task this instance was created to use.
184      *
185      * @return default ticks per task value
186      */
187     protected final long defaultTicksPerTask() {
188         return defaultTicksPerTask;
189     }
190
191     /**
192      * One task is roughly estimated to take this long to close.
193      *
194      * @param now tick number corresponding to caller's present
195      * @return total ticks worked divided by closed tasks, or the default value if no closed tasks
196      */
197     protected final double ticksWorkedPerClosedTask(final long now) {
198         if (tasksClosed < 1) {
199             return defaultTicksPerTask;
200         }
201         return (double) ticksWorked(now) / tasksClosed;
202     }
203
204     // Read only private methods.
205
206     /**
207      * Number of ticks elapsed (before now) while there was at least one open task.
208      *
209      * @param now tick number corresponding to caller's present
210      * @return number of ticks there was at least one task open
211      */
212     private long ticksWorked(final long now) {
213         return isIdle() ? elapsedBeforeIdle : Math.max(now, lastIdle) - lastIdle + elapsedBeforeIdle;
214     }
215
216     /**
217      * Give an estimate of a tick number when there will be no accumulated delays.
218      *
219      * <p>The delays accumulated include one more open task.
220      * Basically, the return value corresponds to openTask() return value,
221      * but this gives an absolute time, instead of delay relative to now.
222      *
223      * @param now tick number corresponding to caller's present
224      * @return estimated tick number when all threads with opened tasks are done waiting
225      */
226     private long estimateAllowed(final long now) {
227         return Math.max(now, nearestAllowed + estimateIsolatedDelay(now));
228     }
229
230     // State-altering "public" methods.
231
232     /**
233      * Track a task is being closed.
234      *
235      * @param now tick number corresponding to caller's present
236      * @param enqueuedTicks see TransitQueue#recordCompletion
237      * @param transmitTicks see TransitQueue#recordCompletion
238      * @param execNanos see TransitQueue#recordCompletion
239      */
240     final void closeTask(final long now, final long enqueuedTicks, final long transmitTicks, final long execNanos) {
241         if (isIdle()) {
242             LOG.info("Attempted to close a task while no tasks are open");
243         } else {
244             unsafeCloseTask(now, enqueuedTicks, transmitTicks, execNanos);
245         }
246     }
247
248     /**
249      * Track a task that is being opened.
250      *
251      * @param now tick number corresponding to caller's present
252      * @return number of ticks (nanos) the caller thread should wait before opening another task
253      */
254     final long openTask(final long now) {
255         openTaskWithoutThrottle(now);
256         return reserveDelay(now);
257     }
258
259     /**
260      * Set nearestAllowed value to now.
261      *
262      * <p>This is useful when new a backend has just connected,
263      * after a period of no working backend present.
264      * The accumulated delays should not limit future tasks.
265      * The queue fullness and the averaged backend performance are kept,
266      * as they should result in good enough estimations for new tasks.
267      *
268      * @param now tick number corresponding to caller's present
269      */
270     final void cancelDebt(final long now) {
271         nearestAllowed = now;
272     }
273
274     // Private state-altering methods.
275
276     /**
277      * Compute the next delay and update nearestAllowed value accordingly.
278      *
279      * @param now tick number corresponding to caller's present
280      * @return number of ticks (nanos) the caller thread should wait before opening another task
281      */
282     private long reserveDelay(final long now) {
283         nearestAllowed = estimateAllowed(now);
284         return nearestAllowed - now;
285     }
286
287     /**
288      * Track a task is being closed.
289      *
290      * <p>This method does not verify there was any task open.
291      * This call can empty the collection of open tasks, that special case should be handled.
292      *
293      * @param now tick number corresponding to caller's present
294      * @param enqueuedTicks see TransmitQueue#recordCompletion
295      * @param transmitTicks see TransmitQueue#recordCompletion
296      * @param execNanos see TransmitQueue#recordCompletion
297      */
298     private void unsafeCloseTask(final long now, final long enqueuedTicks, final long transmitTicks,
299                 final long execNanos) {
300         tasksClosed++;
301         lastClosed = now;
302         if (isIdle()) {
303             transitToIdle(now);
304         }
305     }
306
307     /**
308      * Track a task is being opened.
309      *
310      * <p>This method does not aggregate delays, allowing the caller to sidestep the throttling.
311      * This call can make the collection of open tasks non-empty, that special case should be handled.
312      *
313      * @param now tick number corresponding to caller's present
314      */
315     private void openTaskWithoutThrottle(final long now) {
316         if (isIdle()) {
317             transitFromIdle(now);
318         }
319         tasksEncountered++;
320     }
321
322     /**
323      * Update lastIdle as a new "last" just hapened.
324      */
325     private void transitFromIdle(final long now) {
326         lastIdle = Math.max(now, lastIdle);
327     }
328
329     /**
330      * Update elapsedBeforeIdle as the "before" has jast moved.
331      */
332     private void transitToIdle(final long now) {
333         elapsedBeforeIdle += Math.max(0, now - lastIdle);
334     }
335
336     // Protected abstract read-only methods.
337
338     /**
339      * Give an estimate of a fair delay, assuming delays caused by other opened tasks are ignored.
340      *
341      * @param now tick number corresponding to caller's present
342      * @return delay (in ticks) after which another openTask() would be fair to be called by the same thread again
343      */
344     protected abstract long estimateIsolatedDelay(long now);
345 }