/* * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.yangtools.util.concurrent; import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; import com.google.common.collect.ImmutableList; import java.util.ArrayDeque; import java.util.Collections; import java.util.Iterator; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Stream; import org.checkerframework.checker.lock.qual.GuardedBy; import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.yangtools.concepts.AbstractIdentifiable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * This class manages queuing and dispatching tasks for multiple workers concurrently. Tasks are queued on a per-worker * basis and dispatched serially to each worker via an {@link Executor}. * *

This class optimizes its memory footprint by only allocating and maintaining a queue and executor * task for a worker when there are pending tasks. On the first task(s), a queue is created and a dispatcher task is * submitted to the executor to dispatch the queue to the associated worker. Any subsequent tasks that occur before all * previous tasks have been dispatched are appended to the existing queue. When all tasks have been dispatched, * the queue and dispatcher task are discarded. * * @author Thomas Pantelis * @author Robert Varga * * @param worker key type * @param task type */ abstract class AbstractBatchingExecutor extends AbstractIdentifiable { private static final Logger LOG = LoggerFactory.getLogger(AbstractBatchingExecutor.class); /** * Caps the maximum number of attempts to offer a task to a particular worker. Each attempt window is 1 minute, so * an offer times out after roughly 10 minutes. */ private static final int MAX_NOTIFICATION_OFFER_MINUTES = 10; private static final long GIVE_UP_NANOS = TimeUnit.MINUTES.toNanos(MAX_NOTIFICATION_OFFER_MINUTES); private static final long TASK_WAIT_NANOS = TimeUnit.MILLISECONDS.toNanos(10); private final ConcurrentMap dispatcherTasks = new ConcurrentHashMap<>(); private final @NonNull Executor executor; private final int maxQueueCapacity; AbstractBatchingExecutor(final @NonNull String name, final @NonNull Executor executor, final int maxQueueCapacity) { super(name); this.executor = requireNonNull(executor); checkArgument(maxQueueCapacity > 0, "Invalid maxQueueCapacity %s must be > 0", maxQueueCapacity); this.maxQueueCapacity = maxQueueCapacity; } /** * Returns the maximum worker queue capacity. */ final int maxQueueCapacity() { return maxQueueCapacity; } /** * Returns the {@link Executor} to used for dispatcher tasks. */ final @NonNull Executor executor() { return executor; } // FIXME: YANGTOOLS-1016: allow explicit blocking control final void submitTask(final K key, final T task) { submitTasks(key, Collections.singletonList(requireNonNull(task))); } // FIXME: YANGTOOLS-1016: allow explicit blocking control with return of un-enqueued tasks (or removal from input) final void submitTasks(final K key, final Iterable tasks) { if (tasks == null || key == null) { return; } LOG.trace("{}: submitTasks for worker {}: {}", getIdentifier(), key, tasks); // Keep looping until we are either able to add a new DispatcherTask or are able to add our tasks to an existing // DispatcherTask. Eventually one or the other will occur. try { Iterator it = tasks.iterator(); while (true) { DispatcherTask task = dispatcherTasks.get(key); if (task == null) { // No task found, try to insert a new one final DispatcherTask newTask = new DispatcherTask(key, it); task = dispatcherTasks.putIfAbsent(key, newTask); if (task == null) { // We were able to put our new task - now submit it to the executor and we're done. If it throws // a RejectedExecutionException, let that propagate to the caller. runTask(key, newTask); break; } // We have a racing task, hence we can continue, but we need to refresh our iterator from the task. it = newTask.recoverItems(); } final boolean completed = task.submitTasks(it); if (!completed) { // Task is indicating it is exiting before it has consumed all the items and is exiting. Rather // than spinning on removal, we try to replace it. final DispatcherTask newTask = new DispatcherTask(key, it); if (dispatcherTasks.replace(key, task, newTask)) { runTask(key, newTask); break; } // We failed to replace the task, hence we need retry. Note we have to recover the items to be // published from the new task. it = newTask.recoverItems(); LOG.debug("{}: retrying task queueing for {}", getIdentifier(), key); continue; } // All tasks have either been delivered or we have timed out and warned about the ones we have failed // to deliver. In any case we are done here. break; } } catch (InterruptedException e) { // We were interrupted trying to offer to the worker's queue. Somebody's probably telling us to quit. LOG.warn("{}: Interrupted trying to add to {} worker's queue", getIdentifier(), key); } LOG.trace("{}: submitTasks done for worker {}", getIdentifier(), key); } final Stream streamTasks() { return dispatcherTasks.values().stream(); } abstract void executeBatch(K key, @NonNull ImmutableList tasks) throws Exception; private void runTask(final K key, final DispatcherTask task) { LOG.debug("{}: Submitting DispatcherTask for worker {}", getIdentifier(), key); executor.execute(task); } /** * Executor task for a single worker that queues tasks and sends them serially to the worker. */ final class DispatcherTask implements Runnable { private final Lock lock = new ReentrantLock(); private final Condition notEmpty = lock.newCondition(); private final Condition notFull = lock.newCondition(); private final @NonNull K key; @GuardedBy("lock") private final Queue queue = new ArrayDeque<>(); @GuardedBy("lock") private boolean exiting; DispatcherTask(final @NonNull K key, final @NonNull Iterator tasks) { this.key = requireNonNull(key); while (tasks.hasNext()) { final T task = tasks.next(); if (task != null) { queue.add(task); } } } @NonNull Iterator recoverItems() { // This violates @GuardedBy annotation, but is invoked only when the task is not started and will never // get started, hence this is safe. return queue.iterator(); } @NonNull K key() { return key; } int size() { lock.lock(); try { return queue.size(); } finally { lock.unlock(); } } boolean submitTasks(final @NonNull Iterator tasks) throws InterruptedException { final long start = System.nanoTime(); final long deadline = start + GIVE_UP_NANOS; lock.lock(); try { // Lock may have blocked for some time, we need to take that into account. We may have exceeded // the deadline, but that is unlikely and even in that case we can make some progress without further // blocking. long canWait = deadline - System.nanoTime(); while (true) { // Check the exiting flag - if true then #run is in the process of exiting so return false // to indicate such. Otherwise, offer the tasks to the queue. if (exiting) { return false; } final int avail = maxQueueCapacity - queue.size(); if (avail <= 0) { if (canWait <= 0) { LOG.warn("{}: Failed to offer tasks {} to the queue for worker {}. Exceeded " + "maximum allowable time of {} minutes; the worker is likely in an unrecoverable " + "state (deadlock or endless loop). ", getIdentifier(), ImmutableList.copyOf(tasks), key, MAX_NOTIFICATION_OFFER_MINUTES); return true; } canWait = notFull.awaitNanos(canWait); continue; } for (int i = 0; i < avail; ++i) { if (!tasks.hasNext()) { notEmpty.signal(); return true; } queue.add(tasks.next()); } } } finally { lock.unlock(); } } @GuardedBy("lock") private boolean waitForQueue() { long timeout = TASK_WAIT_NANOS; while (queue.isEmpty()) { if (timeout <= 0) { return false; } try { timeout = notEmpty.awaitNanos(timeout); } catch (InterruptedException e) { // The executor is probably shutting down so log as debug. LOG.debug("{}: Interrupted trying to remove from {} worker's queue", getIdentifier(), key); return false; } } return true; } @Override public void run() { try { // Loop until we've dispatched all the tasks in the queue. while (true) { final @NonNull ImmutableList tasks; lock.lock(); try { if (!waitForQueue()) { exiting = true; break; } // Splice the entire queue tasks = ImmutableList.copyOf(queue); queue.clear(); notFull.signalAll(); } finally { lock.unlock(); } invokeWorker(tasks); } } finally { // We're exiting, gracefully or not - either way make sure we always remove // ourselves from the cache. dispatcherTasks.remove(key, this); } } @SuppressWarnings("checkstyle:illegalCatch") private void invokeWorker(final @NonNull ImmutableList tasks) { LOG.debug("{}: Invoking worker {} with tasks: {}", getIdentifier(), key, tasks); try { executeBatch(key, tasks); } catch (Exception e) { // We'll let a RuntimeException from the worker slide and keep sending any remaining tasks. LOG.error("{}: Error invoking worker {} with {}", getIdentifier(), key, tasks, e); } } } }