2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.yangtools.util.concurrent;
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
13 import com.google.common.collect.ImmutableList;
14 import java.util.ArrayDeque;
15 import java.util.Collections;
16 import java.util.Iterator;
17 import java.util.Queue;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ConcurrentMap;
20 import java.util.concurrent.Executor;
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.locks.Condition;
23 import java.util.concurrent.locks.Lock;
24 import java.util.concurrent.locks.ReentrantLock;
25 import java.util.stream.Stream;
26 import org.checkerframework.checker.lock.qual.GuardedBy;
27 import org.eclipse.jdt.annotation.NonNull;
28 import org.opendaylight.yangtools.concepts.AbstractIdentifiable;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
33 * This class manages queuing and dispatching tasks for multiple workers concurrently. Tasks are queued on a per-worker
34 * basis and dispatched serially to each worker via an {@link Executor}.
36 * <p>This class optimizes its memory footprint by only allocating and maintaining a queue and executor
37 * task for a worker when there are pending tasks. On the first task(s), a queue is created and a dispatcher task is
38 * submitted to the executor to dispatch the queue to the associated worker. Any subsequent tasks that occur before all
39 * previous tasks have been dispatched are appended to the existing queue. When all tasks have been dispatched,
40 * the queue and dispatcher task are discarded.
42 * @author Thomas Pantelis
43 * @author Robert Varga
45 * @param <K> worker key type
46 * @param <T> task type
48 abstract class AbstractBatchingExecutor<K, T> extends AbstractIdentifiable<String> {
49 private static final Logger LOG = LoggerFactory.getLogger(AbstractBatchingExecutor.class);
52 * Caps the maximum number of attempts to offer a task to a particular worker. Each attempt window is 1 minute, so
53 * an offer times out after roughly 10 minutes.
55 private static final int MAX_NOTIFICATION_OFFER_MINUTES = 10;
56 private static final long GIVE_UP_NANOS = TimeUnit.MINUTES.toNanos(MAX_NOTIFICATION_OFFER_MINUTES);
57 private static final long TASK_WAIT_NANOS = TimeUnit.MILLISECONDS.toNanos(10);
59 private final ConcurrentMap<K, DispatcherTask> dispatcherTasks = new ConcurrentHashMap<>();
60 private final @NonNull Executor executor;
61 private final int maxQueueCapacity;
63 AbstractBatchingExecutor(final @NonNull String name, final @NonNull Executor executor, final int maxQueueCapacity) {
65 this.executor = requireNonNull(executor);
66 checkArgument(maxQueueCapacity > 0, "Invalid maxQueueCapacity %s must be > 0", maxQueueCapacity);
67 this.maxQueueCapacity = maxQueueCapacity;
71 * Returns the maximum worker queue capacity.
73 final int maxQueueCapacity() {
74 return maxQueueCapacity;
78 * Returns the {@link Executor} to used for dispatcher tasks.
80 final @NonNull Executor executor() {
84 // FIXME: YANGTOOLS-1016: allow explicit blocking control
85 final void submitTask(final K key, final T task) {
86 submitTasks(key, Collections.singletonList(requireNonNull(task)));
89 // FIXME: YANGTOOLS-1016: allow explicit blocking control with return of un-enqueued tasks (or removal from input)
90 final void submitTasks(final K key, final Iterable<T> tasks) {
91 if (tasks == null || key == null) {
95 LOG.trace("{}: submitTasks for worker {}: {}", getIdentifier(), key, tasks);
97 // Keep looping until we are either able to add a new DispatcherTask or are able to add our tasks to an existing
98 // DispatcherTask. Eventually one or the other will occur.
100 Iterator<T> it = tasks.iterator();
103 DispatcherTask task = dispatcherTasks.get(key);
105 // No task found, try to insert a new one
106 final DispatcherTask newTask = new DispatcherTask(key, it);
107 task = dispatcherTasks.putIfAbsent(key, newTask);
109 // We were able to put our new task - now submit it to the executor and we're done. If it throws
110 // a RejectedExecutionException, let that propagate to the caller.
111 runTask(key, newTask);
115 // We have a racing task, hence we can continue, but we need to refresh our iterator from the task.
116 it = newTask.recoverItems();
119 final boolean completed = task.submitTasks(it);
121 // Task is indicating it is exiting before it has consumed all the items and is exiting. Rather
122 // than spinning on removal, we try to replace it.
123 final DispatcherTask newTask = new DispatcherTask(key, it);
124 if (dispatcherTasks.replace(key, task, newTask)) {
125 runTask(key, newTask);
129 // We failed to replace the task, hence we need retry. Note we have to recover the items to be
130 // published from the new task.
131 it = newTask.recoverItems();
132 LOG.debug("{}: retrying task queueing for {}", getIdentifier(), key);
136 // All tasks have either been delivered or we have timed out and warned about the ones we have failed
137 // to deliver. In any case we are done here.
140 } catch (InterruptedException e) {
141 // We were interrupted trying to offer to the worker's queue. Somebody's probably telling us to quit.
142 LOG.warn("{}: Interrupted trying to add to {} worker's queue", getIdentifier(), key);
145 LOG.trace("{}: submitTasks done for worker {}", getIdentifier(), key);
148 final Stream<DispatcherTask> streamTasks() {
149 return dispatcherTasks.values().stream();
152 abstract void executeBatch(K key, @NonNull ImmutableList<T> tasks) throws Exception;
154 private void runTask(final K key, final DispatcherTask task) {
155 LOG.debug("{}: Submitting DispatcherTask for worker {}", getIdentifier(), key);
156 executor.execute(task);
160 * Executor task for a single worker that queues tasks and sends them serially to the worker.
162 final class DispatcherTask implements Runnable {
163 private final Lock lock = new ReentrantLock();
164 private final Condition notEmpty = lock.newCondition();
165 private final Condition notFull = lock.newCondition();
166 private final @NonNull K key;
169 private final Queue<T> queue = new ArrayDeque<>();
171 private boolean exiting;
173 DispatcherTask(final @NonNull K key, final @NonNull Iterator<T> tasks) {
174 this.key = requireNonNull(key);
175 while (tasks.hasNext()) {
176 final T task = tasks.next();
183 @NonNull Iterator<T> recoverItems() {
184 // This violates @GuardedBy annotation, but is invoked only when the task is not started and will never
185 // get started, hence this is safe.
186 return queue.iterator();
202 boolean submitTasks(final @NonNull Iterator<T> tasks) throws InterruptedException {
203 final long start = System.nanoTime();
204 final long deadline = start + GIVE_UP_NANOS;
208 // Lock may have blocked for some time, we need to take that into account. We may have exceeded
209 // the deadline, but that is unlikely and even in that case we can make some progress without further
211 long canWait = deadline - System.nanoTime();
214 // Check the exiting flag - if true then #run is in the process of exiting so return false
215 // to indicate such. Otherwise, offer the tasks to the queue.
220 final int avail = maxQueueCapacity - queue.size();
223 LOG.warn("{}: Failed to offer tasks {} to the queue for worker {}. Exceeded "
224 + "maximum allowable time of {} minutes; the worker is likely in an unrecoverable "
225 + "state (deadlock or endless loop). ", getIdentifier(), ImmutableList.copyOf(tasks),
226 key, MAX_NOTIFICATION_OFFER_MINUTES);
230 canWait = notFull.awaitNanos(canWait);
234 for (int i = 0; i < avail; ++i) {
235 if (!tasks.hasNext()) {
240 queue.add(tasks.next());
249 private boolean waitForQueue() {
250 long timeout = TASK_WAIT_NANOS;
252 while (queue.isEmpty()) {
258 timeout = notEmpty.awaitNanos(timeout);
259 } catch (InterruptedException e) {
260 // The executor is probably shutting down so log as debug.
261 LOG.debug("{}: Interrupted trying to remove from {} worker's queue", getIdentifier(), key);
272 // Loop until we've dispatched all the tasks in the queue.
274 final @NonNull ImmutableList<T> tasks;
278 if (!waitForQueue()) {
283 // Splice the entire queue
284 tasks = ImmutableList.copyOf(queue);
295 // We're exiting, gracefully or not - either way make sure we always remove
296 // ourselves from the cache.
297 dispatcherTasks.remove(key, this);
301 @SuppressWarnings("checkstyle:illegalCatch")
302 private void invokeWorker(final @NonNull ImmutableList<T> tasks) {
303 LOG.debug("{}: Invoking worker {} with tasks: {}", getIdentifier(), key, tasks);
305 executeBatch(key, tasks);
306 } catch (Exception e) {
307 // We'll let a RuntimeException from the worker slide and keep sending any remaining tasks.
308 LOG.error("{}: Error invoking worker {} with {}", getIdentifier(), key, tasks, e);