Perform partial substatement initialization
[yangtools.git] / common / util / src / main / java / org / opendaylight / yangtools / util / concurrent / AbstractBatchingExecutor.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.yangtools.util.concurrent;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
12
13 import com.google.common.collect.ImmutableList;
14 import java.util.ArrayDeque;
15 import java.util.Collections;
16 import java.util.Iterator;
17 import java.util.Queue;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ConcurrentMap;
20 import java.util.concurrent.Executor;
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.locks.Condition;
23 import java.util.concurrent.locks.Lock;
24 import java.util.concurrent.locks.ReentrantLock;
25 import java.util.stream.Stream;
26 import org.checkerframework.checker.lock.qual.GuardedBy;
27 import org.eclipse.jdt.annotation.NonNull;
28 import org.opendaylight.yangtools.concepts.AbstractIdentifiable;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33  * This class manages queuing and dispatching tasks for multiple workers concurrently. Tasks are queued on a per-worker
34  * basis and dispatched serially to each worker via an {@link Executor}.
35  *
36  * <p>This class optimizes its memory footprint by only allocating and maintaining a queue and executor
37  * task for a worker when there are pending tasks. On the first task(s), a queue is created and a dispatcher task is
38  * submitted to the executor to dispatch the queue to the associated worker. Any subsequent tasks that occur before all
39  * previous tasks have been dispatched are appended to the existing queue. When all tasks have been dispatched,
40  * the queue and dispatcher task are discarded.
41  *
42  * @author Thomas Pantelis
43  * @author Robert Varga
44  *
45  * @param <K> worker key type
46  * @param <T> task type
47  */
48 abstract class AbstractBatchingExecutor<K, T> extends AbstractIdentifiable<String> {
49     private static final Logger LOG = LoggerFactory.getLogger(AbstractBatchingExecutor.class);
50
51     /**
52      * Caps the maximum number of attempts to offer a task to a particular worker. Each attempt window is 1 minute, so
53      * an offer times out after roughly 10 minutes.
54      */
55     private static final int MAX_NOTIFICATION_OFFER_MINUTES = 10;
56     private static final long GIVE_UP_NANOS = TimeUnit.MINUTES.toNanos(MAX_NOTIFICATION_OFFER_MINUTES);
57     private static final long TASK_WAIT_NANOS = TimeUnit.MILLISECONDS.toNanos(10);
58
59     private final ConcurrentMap<K, DispatcherTask> dispatcherTasks = new ConcurrentHashMap<>();
60     private final @NonNull Executor executor;
61     private final int maxQueueCapacity;
62
63     AbstractBatchingExecutor(final @NonNull String name, final @NonNull Executor executor, final int maxQueueCapacity) {
64         super(name);
65         this.executor = requireNonNull(executor);
66         checkArgument(maxQueueCapacity > 0, "Invalid maxQueueCapacity %s must be > 0", maxQueueCapacity);
67         this.maxQueueCapacity = maxQueueCapacity;
68     }
69
70     /**
71      * Returns the maximum worker queue capacity.
72      */
73     final int maxQueueCapacity() {
74         return maxQueueCapacity;
75     }
76
77     /**
78      * Returns the {@link Executor} to used for dispatcher tasks.
79      */
80     final @NonNull Executor executor() {
81         return executor;
82     }
83
84     // FIXME: YANGTOOLS-1016: allow explicit blocking control
85     final void submitTask(final K key, final T task) {
86         submitTasks(key, Collections.singletonList(requireNonNull(task)));
87     }
88
89     // FIXME: YANGTOOLS-1016: allow explicit blocking control with return of un-enqueued tasks (or removal from input)
90     final void submitTasks(final K key, final Iterable<T> tasks) {
91         if (tasks == null || key == null) {
92             return;
93         }
94
95         LOG.trace("{}: submitTasks for worker {}: {}", getIdentifier(), key, tasks);
96
97         // Keep looping until we are either able to add a new DispatcherTask or are able to add our tasks to an existing
98         // DispatcherTask. Eventually one or the other will occur.
99         try {
100             Iterator<T> it = tasks.iterator();
101
102             while (true) {
103                 DispatcherTask task = dispatcherTasks.get(key);
104                 if (task == null) {
105                     // No task found, try to insert a new one
106                     final DispatcherTask newTask = new DispatcherTask(key, it);
107                     task = dispatcherTasks.putIfAbsent(key, newTask);
108                     if (task == null) {
109                         // We were able to put our new task - now submit it to the executor and we're done. If it throws
110                         // a RejectedExecutionException, let that propagate to the caller.
111                         runTask(key, newTask);
112                         break;
113                     }
114
115                     // We have a racing task, hence we can continue, but we need to refresh our iterator from the task.
116                     it = newTask.recoverItems();
117                 }
118
119                 final boolean completed = task.submitTasks(it);
120                 if (!completed) {
121                     // Task is indicating it is exiting before it has consumed all the items and is exiting. Rather
122                     // than spinning on removal, we try to replace it.
123                     final DispatcherTask newTask = new DispatcherTask(key, it);
124                     if (dispatcherTasks.replace(key, task, newTask)) {
125                         runTask(key, newTask);
126                         break;
127                     }
128
129                     // We failed to replace the task, hence we need retry. Note we have to recover the items to be
130                     // published from the new task.
131                     it = newTask.recoverItems();
132                     LOG.debug("{}: retrying task queueing for {}", getIdentifier(), key);
133                     continue;
134                 }
135
136                 // All tasks have either been delivered or we have timed out and warned about the ones we have failed
137                 // to deliver. In any case we are done here.
138                 break;
139             }
140         } catch (InterruptedException e) {
141             // We were interrupted trying to offer to the worker's queue. Somebody's probably telling us to quit.
142             LOG.warn("{}: Interrupted trying to add to {} worker's queue", getIdentifier(), key);
143         }
144
145         LOG.trace("{}: submitTasks done for worker {}", getIdentifier(), key);
146     }
147
148     final Stream<DispatcherTask> streamTasks() {
149         return dispatcherTasks.values().stream();
150     }
151
152     abstract void executeBatch(K key, @NonNull ImmutableList<T> tasks) throws Exception;
153
154     private void runTask(final K key, final DispatcherTask task) {
155         LOG.debug("{}: Submitting DispatcherTask for worker {}", getIdentifier(), key);
156         executor.execute(task);
157     }
158
159     /**
160      * Executor task for a single worker that queues tasks and sends them serially to the worker.
161      */
162     final class DispatcherTask implements Runnable {
163         private final Lock lock = new ReentrantLock();
164         private final Condition notEmpty = lock.newCondition();
165         private final Condition notFull = lock.newCondition();
166         private final @NonNull K key;
167
168         @GuardedBy("lock")
169         private final Queue<T> queue = new ArrayDeque<>();
170         @GuardedBy("lock")
171         private boolean exiting;
172
173         DispatcherTask(final @NonNull K key, final @NonNull Iterator<T> tasks) {
174             this.key = requireNonNull(key);
175             while (tasks.hasNext()) {
176                 final T task = tasks.next();
177                 if (task != null) {
178                     queue.add(task);
179                 }
180             }
181         }
182
183         @NonNull Iterator<T> recoverItems() {
184             // This violates @GuardedBy annotation, but is invoked only when the task is not started and will never
185             // get started, hence this is safe.
186             return queue.iterator();
187         }
188
189         @NonNull K key() {
190             return key;
191         }
192
193         int size() {
194             lock.lock();
195             try {
196                 return queue.size();
197             } finally {
198                 lock.unlock();
199             }
200         }
201
202         boolean submitTasks(final @NonNull Iterator<T> tasks) throws InterruptedException {
203             final long start = System.nanoTime();
204             final long deadline = start + GIVE_UP_NANOS;
205
206             lock.lock();
207             try {
208                 // Lock may have blocked for some time, we need to take that into account. We may have exceeded
209                 // the deadline, but that is unlikely and even in that case we can make some progress without further
210                 // blocking.
211                 long canWait = deadline - System.nanoTime();
212
213                 while (true) {
214                     // Check the exiting flag - if true then #run is in the process of exiting so return false
215                     // to indicate such. Otherwise, offer the tasks to the queue.
216                     if (exiting) {
217                         return false;
218                     }
219
220                     final int avail = maxQueueCapacity - queue.size();
221                     if (avail <= 0) {
222                         if (canWait <= 0) {
223                             LOG.warn("{}: Failed to offer tasks {} to the queue for worker {}. Exceeded "
224                                 + "maximum allowable time of {} minutes; the worker is likely in an unrecoverable "
225                                 + "state (deadlock or endless loop). ", getIdentifier(), ImmutableList.copyOf(tasks),
226                                 key, MAX_NOTIFICATION_OFFER_MINUTES);
227                             return true;
228                         }
229
230                         canWait = notFull.awaitNanos(canWait);
231                         continue;
232                     }
233
234                     for (int i = 0; i < avail; ++i) {
235                         if (!tasks.hasNext()) {
236                             notEmpty.signal();
237                             return true;
238                         }
239
240                         queue.add(tasks.next());
241                     }
242                 }
243             } finally {
244                 lock.unlock();
245             }
246         }
247
248         @GuardedBy("lock")
249         private boolean waitForQueue() {
250             long timeout = TASK_WAIT_NANOS;
251
252             while (queue.isEmpty()) {
253                 if (timeout <= 0) {
254                     return false;
255                 }
256
257                 try {
258                     timeout = notEmpty.awaitNanos(timeout);
259                 } catch (InterruptedException e) {
260                     // The executor is probably shutting down so log as debug.
261                     LOG.debug("{}: Interrupted trying to remove from {} worker's queue", getIdentifier(), key);
262                     return false;
263                 }
264             }
265
266             return true;
267         }
268
269         @Override
270         public void run() {
271             try {
272                 // Loop until we've dispatched all the tasks in the queue.
273                 while (true) {
274                     final @NonNull ImmutableList<T> tasks;
275
276                     lock.lock();
277                     try {
278                         if (!waitForQueue()) {
279                             exiting = true;
280                             break;
281                         }
282
283                         // Splice the entire queue
284                         tasks = ImmutableList.copyOf(queue);
285                         queue.clear();
286
287                         notFull.signalAll();
288                     } finally {
289                         lock.unlock();
290                     }
291
292                     invokeWorker(tasks);
293                 }
294             } finally {
295                 // We're exiting, gracefully or not - either way make sure we always remove
296                 // ourselves from the cache.
297                 dispatcherTasks.remove(key, this);
298             }
299         }
300
301         @SuppressWarnings("checkstyle:illegalCatch")
302         private void invokeWorker(final @NonNull ImmutableList<T> tasks) {
303             LOG.debug("{}: Invoking worker {} with tasks: {}", getIdentifier(), key, tasks);
304             try {
305                 executeBatch(key, tasks);
306             } catch (Exception e) {
307                 // We'll let a RuntimeException from the worker slide and keep sending any remaining tasks.
308                 LOG.error("{}: Error invoking worker {} with {}", getIdentifier(), key, tasks, e);
309             }
310         }
311     }
312 }