Merge branch 'master' of ../controller
[yangtools.git] / common / util / src / main / java / org / opendaylight / yangtools / util / concurrent / QueuedNotificationManager.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.yangtools.util.concurrent;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
12
13 import com.google.common.collect.ImmutableList;
14 import java.util.ArrayDeque;
15 import java.util.Collections;
16 import java.util.Iterator;
17 import java.util.List;
18 import java.util.Queue;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.ConcurrentMap;
21 import java.util.concurrent.Executor;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.locks.Condition;
24 import java.util.concurrent.locks.Lock;
25 import java.util.concurrent.locks.ReentrantLock;
26 import java.util.stream.Collectors;
27 import org.checkerframework.checker.lock.qual.GuardedBy;
28 import org.eclipse.jdt.annotation.NonNull;
29 import org.opendaylight.yangtools.util.ForwardingIdentityObject;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 /**
34  * This class manages queuing and dispatching notifications for multiple listeners concurrently.
35  * Notifications are queued on a per-listener basis and dispatched serially to each listener via an
36  * {@link Executor}.
37  *
38  * <p>This class optimizes its memory footprint by only allocating and maintaining a queue and executor
39  * task for a listener when there are pending notifications. On the first notification(s), a queue
40  * is created and a task is submitted to the executor to dispatch the queue to the associated
41  * listener. Any subsequent notifications that occur before all previous notifications have been
42  * dispatched are appended to the existing queue. When all notifications have been dispatched, the
43  * queue and task are discarded.
44  *
45  * @author Thomas Pantelis
46  *
47  * @param <L> the listener type
48  * @param <N> the notification type
49  */
50 public final class QueuedNotificationManager<L, N> implements NotificationManager<L, N> {
51     @FunctionalInterface
52     public interface BatchedInvoker<L, N> {
53         /**
54          * Called to invoke a listener with a notification.
55          *
56          * @param listener the listener to invoke
57          * @param notifications notifications to send
58          */
59         void invokeListener(@NonNull L listener, @NonNull ImmutableList<N> notifications);
60     }
61
62     private static final Logger LOG = LoggerFactory.getLogger(QueuedNotificationManager.class);
63
64     /**
65      * Caps the maximum number of attempts to offer notification to a particular listener.  Each
66      * attempt window is 1 minute, so an offer times out after roughly 10 minutes.
67      */
68     private static final int MAX_NOTIFICATION_OFFER_MINUTES = 10;
69     private static final long GIVE_UP_NANOS = TimeUnit.MINUTES.toNanos(MAX_NOTIFICATION_OFFER_MINUTES);
70     private static final long TASK_WAIT_NANOS = TimeUnit.MILLISECONDS.toNanos(10);
71
72     /**
73      * We key by listener reference identity hashCode/equals.
74      * Since we don't know anything about the listener class implementations and we're mixing
75      * multiple listener class instances in the same map, this avoids any potential issue with an
76      * equals implementation that just blindly casts the other Object to compare instead of checking
77      * for instanceof.
78      */
79     private final ConcurrentMap<ForwardingIdentityObject<L>, NotificationTask> listenerCache =
80             new ConcurrentHashMap<>();
81     private final @NonNull QueuedNotificationManagerMXBean mxBean = new QueuedNotificationManagerMXBeanImpl(this);
82     private final @NonNull BatchedInvoker<L, N> listenerInvoker;
83     private final @NonNull Executor executor;
84     private final @NonNull String name;
85     private final int maxQueueCapacity;
86
87     private QueuedNotificationManager(final @NonNull Executor executor,
88             final @NonNull BatchedInvoker<L, N> listenerInvoker, final int maxQueueCapacity,
89             final @NonNull String name) {
90         checkArgument(maxQueueCapacity > 0, "Invalid maxQueueCapacity %s must be > 0", maxQueueCapacity);
91         this.executor = requireNonNull(executor);
92         this.listenerInvoker = requireNonNull(listenerInvoker);
93         this.maxQueueCapacity = maxQueueCapacity;
94         this.name = requireNonNull(name);
95     }
96
97     /**
98      * Create a new notification manager.
99      *
100      * @param executor the {@link Executor} to use for notification tasks
101      * @param listenerInvoker the {@link BatchedInvoker} to use for invoking listeners
102      * @param maxQueueCapacity the capacity of each listener queue
103      * @param name the name of this instance for logging info
104      */
105     public static <L, N> QueuedNotificationManager<L, N> create(final @NonNull Executor executor,
106             final@NonNull  BatchedInvoker<L, N> listenerInvoker, final int maxQueueCapacity,
107             final @NonNull String name) {
108         return new QueuedNotificationManager<>(executor, listenerInvoker, maxQueueCapacity, name);
109     }
110
111     /**
112      * Returns the maximum listener queue capacity.
113      */
114     public int getMaxQueueCapacity() {
115         return maxQueueCapacity;
116     }
117
118     /**
119      * Return an {@link QueuedNotificationManagerMXBean} tied to this instance.
120      *
121      * @return An QueuedNotificationManagerMXBean object.
122      */
123     public @NonNull QueuedNotificationManagerMXBean getMXBean() {
124         return mxBean;
125     }
126
127     /**
128      * Returns the {@link Executor} to used for notification tasks.
129      */
130     public @NonNull Executor getExecutor() {
131         return executor;
132     }
133
134     /* (non-Javadoc)
135      * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#addNotification(L, N)
136      */
137     @Override
138     public void submitNotification(final L listener, final N notification) {
139         if (notification != null) {
140             submitNotifications(listener, Collections.singletonList(notification));
141         }
142     }
143
144     /* (non-Javadoc)
145      * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#submitNotifications(L, java.util.Collection)
146      */
147     @Override
148     public void submitNotifications(final L listener, final Iterable<N> notifications) {
149
150         if (notifications == null || listener == null) {
151             return;
152         }
153
154         LOG.trace("{}: submitNotifications for listener {}: {}", name, listener, notifications);
155
156         final ForwardingIdentityObject<L> key = ForwardingIdentityObject.of(listener);
157
158         // Keep looping until we are either able to add a new NotificationTask or are able to
159         // add our notifications to an existing NotificationTask. Eventually one or the other
160         // will occur.
161         try {
162             Iterator<N> it = notifications.iterator();
163
164             while (true) {
165                 NotificationTask task = listenerCache.get(key);
166                 if (task == null) {
167                     // No task found, try to insert a new one
168                     final NotificationTask newTask = new NotificationTask(key, it);
169                     task = listenerCache.putIfAbsent(key, newTask);
170                     if (task == null) {
171                         // We were able to put our new task - now submit it to the executor and
172                         // we're done. If it throws a RejectedExecutionException, let that propagate
173                         // to the caller.
174                         runTask(listener, newTask);
175                         break;
176                     }
177
178                     // We have a racing task, hence we can continue, but we need to refresh our iterator from
179                     // the task.
180                     it = newTask.recoverItems();
181                 }
182
183                 final boolean completed = task.submitNotifications(it);
184                 if (!completed) {
185                     // Task is indicating it is exiting before it has consumed all the items and is exiting. Rather
186                     // than spinning on removal, we try to replace it.
187                     final NotificationTask newTask = new NotificationTask(key, it);
188                     if (listenerCache.replace(key, task, newTask)) {
189                         runTask(listener, newTask);
190                         break;
191                     }
192
193                     // We failed to replace the task, hence we need retry. Note we have to recover the items to be
194                     // published from the new task.
195                     it = newTask.recoverItems();
196                     LOG.debug("{}: retrying task queueing for {}", name, listener);
197                     continue;
198                 }
199
200                 // All notifications have either been delivered or we have timed out and warned about the ones we
201                 // have failed to deliver. In any case we are done here.
202                 break;
203             }
204         } catch (InterruptedException e) {
205             // We were interrupted trying to offer to the listener's queue. Somebody's probably
206             // telling us to quit.
207             LOG.warn("{}: Interrupted trying to add to {} listener's queue", name, listener);
208         }
209
210         LOG.trace("{}: submitNotifications done for listener {}", name, listener);
211     }
212
213     /**
214      * Returns {@link ListenerNotificationQueueStats} instances for each current listener
215      * notification task in progress.
216      */
217     public List<ListenerNotificationQueueStats> getListenerNotificationQueueStats() {
218         return listenerCache.values().stream().map(t -> new ListenerNotificationQueueStats(t.listenerKey.toString(),
219             t.size())).collect(Collectors.toList());
220     }
221
222     private void runTask(final L listener, final NotificationTask task) {
223         LOG.debug("{}: Submitting NotificationTask for listener {}", name, listener);
224         executor.execute(task);
225     }
226
227     /**
228      * Executor task for a single listener that queues notifications and sends them serially to the
229      * listener.
230      */
231     private class NotificationTask implements Runnable {
232         private final Lock lock = new ReentrantLock();
233         private final Condition notEmpty = lock.newCondition();
234         private final Condition notFull = lock.newCondition();
235         private final @NonNull ForwardingIdentityObject<L> listenerKey;
236
237         @GuardedBy("lock")
238         private final Queue<N> queue = new ArrayDeque<>();
239         @GuardedBy("lock")
240         private boolean exiting;
241
242         NotificationTask(final @NonNull ForwardingIdentityObject<L> listenerKey,
243                 final @NonNull Iterator<N> notifications) {
244             this.listenerKey = requireNonNull(listenerKey);
245             while (notifications.hasNext()) {
246                 queue.add(notifications.next());
247             }
248         }
249
250         @NonNull Iterator<N> recoverItems() {
251             // This violates @GuardedBy annotation, but is invoked only when the task is not started and will never
252             // get started, hence this is safe.
253             return queue.iterator();
254         }
255
256         int size() {
257             lock.lock();
258             try {
259                 return queue.size();
260             } finally {
261                 lock.unlock();
262             }
263         }
264
265         boolean submitNotifications(final @NonNull Iterator<N> notifications) throws InterruptedException {
266             final long start = System.nanoTime();
267             final long deadline = start + GIVE_UP_NANOS;
268
269             lock.lock();
270             try {
271                 // Lock may have blocked for some time, we need to take that into account. We may have exceedded
272                 // the deadline, but that is unlikely and even in that case we can make some progress without further
273                 // blocking.
274                 long canWait = deadline - System.nanoTime();
275
276                 while (true) {
277                     // Check the exiting flag - if true then #run is in the process of exiting so return
278                     // false to indicate such. Otherwise, offer the notifications to the queue.
279                     if (exiting) {
280                         return false;
281                     }
282
283                     final int avail = maxQueueCapacity - queue.size();
284                     if (avail <= 0) {
285                         if (canWait <= 0) {
286                             LOG.warn("{}: Failed to offer notifications {} to the queue for listener {}. Exceeded"
287                                 + "maximum allowable time of {} minutes; the listener is likely in an unrecoverable"
288                                 + "state (deadlock or endless loop). ", name, ImmutableList.copyOf(notifications),
289                                 listenerKey, MAX_NOTIFICATION_OFFER_MINUTES);
290                             return true;
291                         }
292
293                         canWait = notFull.awaitNanos(canWait);
294                         continue;
295                     }
296
297                     for (int i = 0; i < avail; ++i) {
298                         if (!notifications.hasNext()) {
299                             notEmpty.signal();
300                             return true;
301                         }
302
303                         queue.add(notifications.next());
304                     }
305                 }
306             } finally {
307                 lock.unlock();
308             }
309         }
310
311         @GuardedBy("lock")
312         private boolean waitForQueue() {
313             long timeout = TASK_WAIT_NANOS;
314
315             while (queue.isEmpty()) {
316                 if (timeout <= 0) {
317                     return false;
318                 }
319
320                 try {
321                     timeout = notEmpty.awaitNanos(timeout);
322                 } catch (InterruptedException e) {
323                     // The executor is probably shutting down so log as debug.
324                     LOG.debug("{}: Interrupted trying to remove from {} listener's queue", name, listenerKey);
325                     return false;
326                 }
327             }
328
329             return true;
330         }
331
332         @Override
333         public void run() {
334             try {
335                 // Loop until we've dispatched all the notifications in the queue.
336                 while (true) {
337                     final @NonNull ImmutableList<N> notifications;
338
339                     lock.lock();
340                     try {
341                         if (!waitForQueue()) {
342                             exiting = true;
343                             break;
344                         }
345
346                         // Splice the entire queue
347                         notifications = ImmutableList.copyOf(queue);
348                         queue.clear();
349
350                         notFull.signalAll();
351                     } finally {
352                         lock.unlock();
353                     }
354
355                     invokeListener(notifications);
356                 }
357             } finally {
358                 // We're exiting, gracefully or not - either way make sure we always remove
359                 // ourselves from the cache.
360                 listenerCache.remove(listenerKey, this);
361             }
362         }
363
364         @SuppressWarnings("checkstyle:illegalCatch")
365         private void invokeListener(final @NonNull ImmutableList<N> notifications) {
366             LOG.debug("{}: Invoking listener {} with notification: {}", name, listenerKey, notifications);
367             try {
368                 listenerInvoker.invokeListener(listenerKey.getDelegate(), notifications);
369             } catch (Exception e) {
370                 // We'll let a RuntimeException from the listener slide and keep sending any remaining notifications.
371                 LOG.error("{}: Error notifying listener {} with {}", name, listenerKey, notifications, e);
372             }
373         }
374     }
375 }