Remove JournalWriter.getLastEntry()
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / ProgressTracker.java
1 /*
2  * Copyright (c) 2016, 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.access.client;
10
11 import com.google.common.base.Preconditions;
12 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory;
14
15 /**
16  * Base class for tracking throughput and computing delays when processing stream of tasks.
17  *
18  * <p>The idea is to improve throughput in a typical request-response scenario.
19  * Multiple "user" threads are submitting requests to a "frontend". The frontend does some
20  * pre-processing and then sends requests to (usually one) "backend". The backend does the main work
21  * and replies to the frontend, which reports to the the corresponding user.
22  * In terms of task processing, user threads are "opening" tasks and frontend is "closing" them.
23  * Latency of the backend may fluctuate wildly. To avoid backend running out of open tasks,
24  * frontend should maintain a queue of requests for users to submit tasks to.
25  * In order to avoid excessive memory consumption, there should be a back-pressure mechanism
26  * which blocks the user (submit) threads for appropriate durations.
27  * Users can tolerate moderately delayed responses, but they only tolerate small block (submit)
28  * times.
29  *
30  * <p>An ideal back-pressure algorithm would keep the queue reasonably full,
31  * while fairly delaying the user threads. In other words, backend idle time should be low,
32  * as well as user block time dispersion
33  * (as opposed to block time average, which is dictated by overall performance).
34  *
35  * <p>In order for an algorithm to compute reasonable wait times,
36  * various inputs can be useful, mostly related to timing of various stages of task processing.
37  * Methods of this class assume "enqueue and wait" usage, submit thread is supposed to block itself
38  * when asked to. The delay computation is pessimistic, it expects each participating thread
39  * to enqueue another task as soon as its delay time allows.
40  *
41  * <p>This class is to be used by single frontend. This class is not thread safe,
42  * the frontend is responsible for guarding against conflicting access.
43  * Time is measured in ticks (nanoseconds), methods never look at current time,
44  * relying on {@code now} argument where appropriate.
45  * This means the sequence of {@code now} argument values is expected to be non-decreasing.
46  *
47  * <p>Input data used for tracking is tightly coupled with TransitQueue#recordCompletion arguments.
48  *
49  * @author Vratko Polak
50  */
51 // TODO: Would bulk methods be less taxing than a loop of single task calls?
52 abstract class ProgressTracker {
53     private static final Logger LOG = LoggerFactory.getLogger(ProgressTracker.class);
54
55     /**
56      * When no tasks has been closed yet, this will be used to estimate throughput.
57      */
58     private final long defaultTicksPerTask;
59
60     /**
61      * Number of tasks closed so far.
62      */
63     private long tasksClosed = 0;
64
65     /**
66      * Number of tasks so far, both open and closed.
67      */
68     private long tasksEncountered = 0;
69
70     /**
71      * The most recent tick number when the number of open tasks has become non-positive.
72      */
73     private long lastIdle = Long.MIN_VALUE;
74
75     /**
76      * The most recent tick number when a task has been closed.
77      */
78     private long lastClosed = Long.MIN_VALUE;
79
80     /**
81      * Tick number when the farthest known wait time is over.
82      */
83     private long nearestAllowed = Long.MIN_VALUE;
84
85     /**
86      * Number of ticks elapsed before lastIdle while there was at least one open task.
87      */
88     private long elapsedBeforeIdle = 0L;
89
90     // Constructors
91
92     /**
93      * Construct an idle tracker with specified ticks per task value to use as default.
94      *
95      * @param ticksPerTask value to use as default
96      */
97     ProgressTracker(final long ticksPerTask) {
98         Preconditions.checkArgument(ticksPerTask >= 0);
99         defaultTicksPerTask = ticksPerTask;
100     }
101
102     /**
103      * Construct a new tracker suitable for a new task queue related to a "reconnect".
104      *
105      * <p>When reconnecting to a new backend, tasks may need to be re-processed by the frontend,
106      * possibly resulting in a different number of tasks.
107      * Also, performance of the new backend can be different, but the perforance of the previous backend
108      * is generally still better estimate than defaults of a brand new tracker.
109      *
110      * <p>This "inheritance constructor" creates a new tracker with no open tasks (thus initially idle),
111      * but other internal values should lead to a balanced performance
112      * after tasks opened in the source tracker are "replayed" into the new tracker.
113      *
114      * <p>In particular, this impementation keeps the number of closed tasks the same,
115      * and makes it so ticksWorkedPerClosedTask is initially the same as in the old tracker.
116      *
117      * @param oldTracker the tracker used for the previously used backend
118      * @param now tick number corresponding to caller's present
119      */
120     ProgressTracker(final ProgressTracker oldTracker, final long now) {
121         this.defaultTicksPerTask = oldTracker.defaultTicksPerTask;
122         this.tasksEncountered = this.tasksClosed = oldTracker.tasksClosed;
123         this.lastClosed = oldTracker.lastClosed;
124         this.nearestAllowed = oldTracker.nearestAllowed;  // Call cancelDebt explicitly if needed.
125         this.lastIdle = oldTracker.lastIdle;
126         this.elapsedBeforeIdle = oldTracker.elapsedBeforeIdle;
127         if (!oldTracker.isIdle()) {
128             transitToIdle(now);
129         }
130     }
131
132     // "Public" shared access (read-only) accessor-like methods
133
134     /**
135      * Get number of tasks closed so far.
136      *
137      * @return number of tasks known to be finished already; the value never decreases
138      */
139     final long tasksClosed() {
140         return tasksClosed;
141     }
142
143     /**
144      * Get umber of tasks so far, both open and closed.
145      *
146      * @return number of tasks encountered so far, open or finished; the value never decreases
147      */
148     final long tasksEncountered() {
149         return tasksEncountered;
150     }
151
152     /**
153      * Get number of tasks currently open.
154      *
155      * @return number of tasks started but not finished yet
156      */
157     final long tasksOpen() {  // TODO: Should we return int?
158         // TODO: Should we check the return value is non-negative?
159         return tasksEncountered - tasksClosed;
160     }
161
162     /**
163      * When idle, there are no open tasks so no progress is made.
164      *
165      * @return {@code true} if every encountered task is already closed, {@code false} otherwise
166      */
167     final boolean isIdle() {
168         return tasksClosed >= tasksEncountered;
169     }
170
171     /**
172      * Number of ticks elapsed (before now) since the last closed task while there was at least one open task.
173      *
174      * @param now tick number corresponding to caller's present
175      * @return number of ticks backend is neither idle nor responding
176      */
177     final long ticksStalling(final long now) {
178         return isIdle() ? 0 : Math.max(now, lastClosed) - lastClosed;
179     }
180
181     // Read only protected methods.
182
183     /**
184      * Get the value of default ticks per task this instance was created to use.
185      *
186      * @return default ticks per task value
187      */
188     protected final long defaultTicksPerTask() {
189         return defaultTicksPerTask;
190     }
191
192     /**
193      * One task is roughly estimated to take this long to close.
194      *
195      * @param now tick number corresponding to caller's present
196      * @return total ticks worked divided by closed tasks, or the default value if no closed tasks
197      */
198     protected final double ticksWorkedPerClosedTask(final long now) {
199         if (tasksClosed < 1) {
200             return defaultTicksPerTask;
201         }
202         return (double) ticksWorked(now) / tasksClosed;
203     }
204
205     // Read only private methods.
206
207     /**
208      * Number of ticks elapsed (before now) while there was at least one open task.
209      *
210      * @param now tick number corresponding to caller's present
211      * @return number of ticks there was at least one task open
212      */
213     private long ticksWorked(final long now) {
214         return isIdle() ? elapsedBeforeIdle : Math.max(now, lastIdle) - lastIdle + elapsedBeforeIdle;
215     }
216
217     /**
218      * Give an estimate of a tick number when there will be no accumulated delays.
219      *
220      * <p>The delays accumulated include one more open task.
221      * Basically, the return value corresponds to openTask() return value,
222      * but this gives an absolute time, instead of delay relative to now.
223      *
224      * @param now tick number corresponding to caller's present
225      * @return estimated tick number when all threads with opened tasks are done waiting
226      */
227     private long estimateAllowed(final long now) {
228         return Math.max(now, nearestAllowed + estimateIsolatedDelay(now));
229     }
230
231     // State-altering "public" methods.
232
233     /**
234      * Track a task is being closed.
235      *
236      * @param now tick number corresponding to caller's present
237      * @param enqueuedTicks see TransitQueue#recordCompletion
238      * @param transmitTicks see TransitQueue#recordCompletion
239      * @param execNanos see TransitQueue#recordCompletion
240      */
241     final void closeTask(final long now, final long enqueuedTicks, final long transmitTicks, final long execNanos) {
242         if (isIdle()) {
243             LOG.info("Attempted to close a task while no tasks are open");
244         } else {
245             unsafeCloseTask(now, enqueuedTicks, transmitTicks, execNanos);
246         }
247     }
248
249     /**
250      * Track a task that is being opened.
251      *
252      * @param now tick number corresponding to caller's present
253      * @return number of ticks (nanos) the caller thread should wait before opening another task
254      */
255     final long openTask(final long now) {
256         openTaskWithoutThrottle(now);
257         return reserveDelay(now);
258     }
259
260     /**
261      * Set nearestAllowed value to now.
262      *
263      * <p>This is useful when new a backend has just connected,
264      * after a period of no working backend present.
265      * The accumulated delays should not limit future tasks.
266      * The queue fullness and the averaged backend performance are kept,
267      * as they should result in good enough estimations for new tasks.
268      *
269      * @param now tick number corresponding to caller's present
270      */
271     final void cancelDebt(final long now) {
272         nearestAllowed = now;
273     }
274
275     // Private state-altering methods.
276
277     /**
278      * Compute the next delay and update nearestAllowed value accordingly.
279      *
280      * @param now tick number corresponding to caller's present
281      * @return number of ticks (nanos) the caller thread should wait before opening another task
282      */
283     private long reserveDelay(final long now) {
284         nearestAllowed = estimateAllowed(now);
285         return nearestAllowed - now;
286     }
287
288     /**
289      * Track a task is being closed.
290      *
291      * <p>This method does not verify there was any task open.
292      * This call can empty the collection of open tasks, that special case should be handled.
293      *
294      * @param now tick number corresponding to caller's present
295      * @param enqueuedTicks see TransmitQueue#recordCompletion
296      * @param transmitTicks see TransmitQueue#recordCompletion
297      * @param execNanos see TransmitQueue#recordCompletion
298      */
299     private void unsafeCloseTask(final long now, final long enqueuedTicks, final long transmitTicks,
300                 final long execNanos) {
301         tasksClosed++;
302         lastClosed = now;
303         if (isIdle()) {
304             transitToIdle(now);
305         }
306     }
307
308     /**
309      * Track a task is being opened.
310      *
311      * <p>This method does not aggregate delays, allowing the caller to sidestep the throttling.
312      * This call can make the collection of open tasks non-empty, that special case should be handled.
313      *
314      * @param now tick number corresponding to caller's present
315      */
316     private void openTaskWithoutThrottle(final long now) {
317         if (isIdle()) {
318             transitFromIdle(now);
319         }
320         tasksEncountered++;
321     }
322
323     /**
324      * Update lastIdle as a new "last" just hapened.
325      */
326     private void transitFromIdle(final long now) {
327         lastIdle = Math.max(now, lastIdle);
328     }
329
330     /**
331      * Update elapsedBeforeIdle as the "before" has jast moved.
332      */
333     private void transitToIdle(final long now) {
334         elapsedBeforeIdle += Math.max(0, now - lastIdle);
335     }
336
337     // Protected abstract read-only methods.
338
339     /**
340      * Give an estimate of a fair delay, assuming delays caused by other opened tasks are ignored.
341      *
342      * @param now tick number corresponding to caller's present
343      * @return delay (in ticks) after which another openTask() would be fair to be called by the same thread again
344      */
345     protected abstract long estimateIsolatedDelay(long now);
346 }