X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fcds-access-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Faccess%2Fclient%2FProgressTracker.java;h=fed9d4c5d3d01d5763b1b6420450c10d7f197fc5;hb=3859df9beca8f13f1ff2b2744ed3470a1715bec3;hp=25b7d7edc94f5b97cf22c61536db872850461b8b;hpb=b62d5a77da1c4e11d79d397ace8a8ffb96201556;p=controller.git diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ProgressTracker.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ProgressTracker.java index 25b7d7edc9..fed9d4c5d3 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ProgressTracker.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ProgressTracker.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2016, 2017 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -9,7 +9,6 @@ package org.opendaylight.controller.cluster.access.client; import com.google.common.base.Preconditions; -import javax.annotation.concurrent.NotThreadSafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,37 +16,39 @@ import org.slf4j.LoggerFactory; * Base class for tracking throughput and computing delays when processing stream of tasks. * *

The idea is to improve throughput in a typical request-response scenario. - * A "frontend" is sending requests towards "backend", backend is sending responses back to fronted. - * Both frontend and backend may be realized by multiple Java threads, - * so there may be multiple requests not yet responded to. - * In terms of taks processing, frontend is "opening" tasks and backend is "closing" them. + * Multiple "user" threads are submitting requests to a "frontend". The frontend does some + * pre-processing and then sends requests to (usually one) "backend". The backend does the main work + * and replies to the frontend, which reports to the the corresponding user. + * In terms of task processing, user threads are "opening" tasks and frontend is "closing" them. * Latency of the backend may fluctuate wildly. To avoid backend running out of open tasks, - * there should be a queue of requests frontend can add to. + * frontend should maintain a queue of requests for users to submit tasks to. * In order to avoid excessive memory consumption, there should be a back-pressure mechanism - * which blocks the frontend threads for appropriate durations. - * Frontend can tolerate moderately delayed responses, but it only tolerates small block times. + * which blocks the user (submit) threads for appropriate durations. + * Users can tolerate moderately delayed responses, but they only tolerate small block (submit) + * times. * *

An ideal back-pressure algorithm would keep the queue reasonably full, - * while fairly delaying the frontend threads. In other words, backend idle time should be low, - * as well as frontend block time dispersion + * while fairly delaying the user threads. In other words, backend idle time should be low, + * as well as user block time dispersion * (as opposed to block time average, which is dictated by overall performance). * *

In order for an algorithm to compute reasonable wait times, * various inputs can be useful, mostly related to timing of various stages of task processing. - * Methods of this class assume "enqueue and wait" usage. - * The delay computation is pessimistic, it expects each participating thread to enqueue another task - * as soon as its delay time allows. + * Methods of this class assume "enqueue and wait" usage, submit thread is supposed to block itself + * when asked to. The delay computation is pessimistic, it expects each participating thread + * to enqueue another task as soon as its delay time allows. * - *

This class is not thread safe, the callers are responsible for guarding against conflicting access. - * Time is measured in ticks (nanos), methods never look at current time, relying on {@code now} argument instead. - * This means the sequence of {$code now} argument values is expected to be non-decreasing. + *

This class is to be used by single frontend. This class is not thread safe, + * the frontend is responsible for guarding against conflicting access. + * Time is measured in ticks (nanoseconds), methods never look at current time, + * relying on {@code now} argument where appropriate. + * This means the sequence of {@code now} argument values is expected to be non-decreasing. * *

Input data used for tracking is tightly coupled with TransitQueue#recordCompletion arguments. * * @author Vratko Polak */ // TODO: Would bulk methods be less taxing than a loop of single task calls? -@NotThreadSafe abstract class ProgressTracker { private static final Logger LOG = LoggerFactory.getLogger(ProgressTracker.class); @@ -99,37 +100,43 @@ abstract class ProgressTracker { } /** - * Construct a copy of an existing tracker, all future tracking is fully independent. + * Construct a new tracker suitable for a new task queue related to a "reconnect". * - * @param tracker the instance to copy state from - */ - ProgressTracker(final ProgressTracker tracker) { - this.defaultTicksPerTask = tracker.defaultTicksPerTask; - this.tasksClosed = tracker.tasksClosed; - this.tasksEncountered = tracker.tasksEncountered; - this.lastClosed = tracker.lastClosed; - this.lastIdle = tracker.lastIdle; - this.nearestAllowed = tracker.nearestAllowed; - this.elapsedBeforeIdle = tracker.elapsedBeforeIdle; - } - - // Public shared access (read-only) accessor-like methods - - /** - * Get the value of default ticks per task this instance was created to use. + *

When reconnecting to a new backend, tasks may need to be re-processed by the frontend, + * possibly resulting in a different number of tasks. + * Also, performance of the new backend can be different, but the perforance of the previous backend + * is generally still better estimate than defaults of a brand new tracker. * - * @return default ticks per task value + *

This "inheritance constructor" creates a new tracker with no open tasks (thus initially idle), + * but other internal values should lead to a balanced performance + * after tasks opened in the source tracker are "replayed" into the new tracker. + * + *

In particular, this impementation keeps the number of closed tasks the same, + * and makes it so ticksWorkedPerClosedTask is initially the same as in the old tracker. + * + * @param oldTracker the tracker used for the previously used backend + * @param now tick number corresponding to caller's present */ - public final long defaultTicksPerTask() { - return defaultTicksPerTask; + ProgressTracker(final ProgressTracker oldTracker, final long now) { + this.defaultTicksPerTask = oldTracker.defaultTicksPerTask; + this.tasksEncountered = this.tasksClosed = oldTracker.tasksClosed; + this.lastClosed = oldTracker.lastClosed; + this.nearestAllowed = oldTracker.nearestAllowed; // Call cancelDebt explicitly if needed. + this.lastIdle = oldTracker.lastIdle; + this.elapsedBeforeIdle = oldTracker.elapsedBeforeIdle; + if (!oldTracker.isIdle()) { + transitToIdle(now); + } } + // "Public" shared access (read-only) accessor-like methods + /** * Get number of tasks closed so far. * * @return number of tasks known to be finished already; the value never decreases */ - public final long tasksClosed() { + final long tasksClosed() { return tasksClosed; } @@ -138,7 +145,7 @@ abstract class ProgressTracker { * * @return number of tasks encountered so far, open or finished; the value never decreases */ - public final long tasksEncountered() { + final long tasksEncountered() { return tasksEncountered; } @@ -147,7 +154,7 @@ abstract class ProgressTracker { * * @return number of tasks started but not finished yet */ - public final long tasksOpen() { + final long tasksOpen() { // TODO: Should we return int? // TODO: Should we check the return value is non-negative? return tasksEncountered - tasksClosed; } @@ -157,7 +164,7 @@ abstract class ProgressTracker { * * @return {@code true} if every encountered task is already closed, {@code false} otherwise */ - public boolean isIdle() { + final boolean isIdle() { return tasksClosed >= tasksEncountered; } @@ -167,18 +174,19 @@ abstract class ProgressTracker { * @param now tick number corresponding to caller's present * @return number of ticks backend is neither idle nor responding */ - public long ticksStalling(final long now) { + final long ticksStalling(final long now) { return isIdle() ? 0 : Math.max(now, lastClosed) - lastClosed; } + // Read only protected methods. + /** - * Number of ticks elapsed (before now) while there was at least one open task. + * Get the value of default ticks per task this instance was created to use. * - * @param now tick number corresponding to caller's present - * @return number of ticks there was at least one task open + * @return default ticks per task value */ - public long ticksWorked(final long now) { - return isIdle() ? elapsedBeforeIdle : Math.max(now, lastIdle) - lastIdle + elapsedBeforeIdle; + protected final long defaultTicksPerTask() { + return defaultTicksPerTask; } /** @@ -187,25 +195,23 @@ abstract class ProgressTracker { * @param now tick number corresponding to caller's present * @return total ticks worked divided by closed tasks, or the default value if no closed tasks */ - public double ticksWorkedPerClosedTask(final long now) { + protected final double ticksWorkedPerClosedTask(final long now) { if (tasksClosed < 1) { return defaultTicksPerTask; } return (double) ticksWorked(now) / tasksClosed; } + // Read only private methods. + /** - * Give an estimate of openTask() return value. - * - *

When the returned delay is positive, the caller thread should wait that time before opening additional task. - * - *

This method in general takes into account previously assigned delays to avoid overlaps. + * Number of ticks elapsed (before now) while there was at least one open task. * * @param now tick number corresponding to caller's present - * @return delay (in ticks) after which another openTask() is fair to be called by the same thread again + * @return number of ticks there was at least one task open */ - public long estimateDelay(final long now) { - return estimateAllowed(now) - now; + private long ticksWorked(final long now) { + return isIdle() ? elapsedBeforeIdle : Math.max(now, lastIdle) - lastIdle + elapsedBeforeIdle; } /** @@ -218,11 +224,11 @@ abstract class ProgressTracker { * @param now tick number corresponding to caller's present * @return estimated tick number when all threads with opened tasks are done waiting */ - public long estimateAllowed(final long now) { - return Math.max(now, nearestAllowed) + estimateIsolatedDelay(now); + private long estimateAllowed(final long now) { + return Math.max(now, nearestAllowed + estimateIsolatedDelay(now)); } - // State-altering public methods. + // State-altering "public" methods. /** * Track a task is being closed. @@ -232,11 +238,11 @@ abstract class ProgressTracker { * @param transmitTicks see TransitQueue#recordCompletion * @param execNanos see TransitQueue#recordCompletion */ - public void closeTask(final long now, final long enqueuedTicks, final long transmitTicks, final long execNanos) { + final void closeTask(final long now, final long enqueuedTicks, final long transmitTicks, final long execNanos) { if (isIdle()) { LOG.info("Attempted to close a task while no tasks are open"); } else { - protectedCloseTask(now, enqueuedTicks, transmitTicks, execNanos); + unsafeCloseTask(now, enqueuedTicks, transmitTicks, execNanos); } } @@ -246,13 +252,27 @@ abstract class ProgressTracker { * @param now tick number corresponding to caller's present * @return number of ticks (nanos) the caller thread should wait before opening another task */ - public long openTask(final long now) { - protectedOpenTask(now); + final long openTask(final long now) { + openTaskWithoutThrottle(now); return reserveDelay(now); } - // Internal state-altering methods. Protected instead of private, - // allowing subclasses to weaken ad-hoc invariants of current implementation. + /** + * Set nearestAllowed value to now. + * + *

This is useful when new a backend has just connected, + * after a period of no working backend present. + * The accumulated delays should not limit future tasks. + * The queue fullness and the averaged backend performance are kept, + * as they should result in good enough estimations for new tasks. + * + * @param now tick number corresponding to caller's present + */ + final void cancelDebt(final long now) { + nearestAllowed = now; + } + + // Private state-altering methods. /** * Compute the next delay and update nearestAllowed value accordingly. @@ -260,7 +280,7 @@ abstract class ProgressTracker { * @param now tick number corresponding to caller's present * @return number of ticks (nanos) the caller thread should wait before opening another task */ - protected long reserveDelay(final long now) { + private long reserveDelay(final long now) { nearestAllowed = estimateAllowed(now); return nearestAllowed - now; } @@ -272,16 +292,16 @@ abstract class ProgressTracker { * This call can empty the collection of open tasks, that special case should be handled. * * @param now tick number corresponding to caller's present - * @param enqueuedTicks see TransitQueue#recordCompletion - * @param transmitTicks see TransitQueue#recordCompletion - * @param execNanos see TransitQueue#recordCompletion + * @param enqueuedTicks see TransmitQueue#recordCompletion + * @param transmitTicks see TransmitQueue#recordCompletion + * @param execNanos see TransmitQueue#recordCompletion */ - protected void protectedCloseTask(final long now, final long enqueuedTicks, final long transmitTicks, + private void unsafeCloseTask(final long now, final long enqueuedTicks, final long transmitTicks, final long execNanos) { tasksClosed++; lastClosed = now; if (isIdle()) { - elapsedBeforeIdle += now - lastIdle; + transitToIdle(now); } } @@ -293,18 +313,34 @@ abstract class ProgressTracker { * * @param now tick number corresponding to caller's present */ - protected void protectedOpenTask(final long now) { + private void openTaskWithoutThrottle(final long now) { if (isIdle()) { - lastIdle = Math.max(now, lastIdle); + transitFromIdle(now); } tasksEncountered++; } + /** + * Update lastIdle as a new "last" just hapened. + */ + private void transitFromIdle(final long now) { + lastIdle = Math.max(now, lastIdle); + } + + /** + * Update elapsedBeforeIdle as the "before" has jast moved. + */ + private void transitToIdle(final long now) { + elapsedBeforeIdle += Math.max(0, now - lastIdle); + } + + // Protected abstract read-only methods. + /** * Give an estimate of a fair delay, assuming delays caused by other opened tasks are ignored. * * @param now tick number corresponding to caller's present * @return delay (in ticks) after which another openTask() would be fair to be called by the same thread again */ - abstract long estimateIsolatedDelay(final long now); + protected abstract long estimateIsolatedDelay(long now); }