* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.yangtools.util.concurrent;
import static com.google.common.base.Preconditions.checkArgument;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
-import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
+import org.eclipse.jdt.annotation.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* @param listener the listener to invoke
* @param notifications notifications to send
*/
- void invokeListener(@Nonnull L listener, @Nonnull Collection<? extends N> notifications);
+ void invokeListener(@NonNull L listener, @NonNull Collection<? extends N> notifications);
}
private static final Logger LOG = LoggerFactory.getLogger(QueuedNotificationManager.class);
private static final long TASK_WAIT_NANOS = TimeUnit.MILLISECONDS.toNanos(10);
private final ConcurrentMap<ListenerKey<L>, NotificationTask> listenerCache = new ConcurrentHashMap<>();
- private final BatchedInvoker<L, N> listenerInvoker;
- private final Executor executor;
- private final String name;
+ private final @NonNull BatchedInvoker<L, N> listenerInvoker;
+ private final @NonNull Executor executor;
+ private final @NonNull String name;
private final int maxQueueCapacity;
- private QueuedNotificationManager(final Executor executor, final BatchedInvoker<L, N> listenerInvoker,
- final int maxQueueCapacity, final String name) {
+ private QueuedNotificationManager(final @NonNull Executor executor,
+ final @NonNull BatchedInvoker<L, N> listenerInvoker, final int maxQueueCapacity,
+ final @NonNull String name) {
checkArgument(maxQueueCapacity > 0, "Invalid maxQueueCapacity %s must be > 0", maxQueueCapacity);
this.executor = requireNonNull(executor);
this.listenerInvoker = requireNonNull(listenerInvoker);
*/
@Deprecated
@SuppressWarnings("checkstyle:illegalCatch")
- public QueuedNotificationManager(final Executor executor, final Invoker<L, N> listenerInvoker,
- final int maxQueueCapacity, final String name) {
+ public QueuedNotificationManager(final @NonNull Executor executor, final @NonNull Invoker<L, N> listenerInvoker,
+ final int maxQueueCapacity, final @NonNull String name) {
this(executor, (BatchedInvoker<L, N>)(listener, notifications) -> notifications.forEach(n -> {
try {
listenerInvoker.invokeListener(listener, n);
* @param maxQueueCapacity the capacity of each listener queue
* @param name the name of this instance for logging info
*/
- public static <L, N> QueuedNotificationManager<L, N> create(final Executor executor,
- final BatchedInvoker<L, N> listenerInvoker, final int maxQueueCapacity, final String name) {
+ public static <L, N> QueuedNotificationManager<L, N> create(final @NonNull Executor executor,
+ final@NonNull BatchedInvoker<L, N> listenerInvoker, final int maxQueueCapacity,
+ final @NonNull String name) {
return new QueuedNotificationManager<>(executor, listenerInvoker, maxQueueCapacity, name);
}
/**
* Returns the {@link Executor} to used for notification tasks.
*/
- public Executor getExecutor() {
+ public @NonNull Executor getExecutor() {
return executor;
}
* for instanceof.
*/
private static final class ListenerKey<L> {
- private final L listener;
+ private final @NonNull L listener;
ListenerKey(final L listener) {
this.listener = requireNonNull(listener);
}
- L getListener() {
+ @NonNull L getListener() {
return listener;
}
@Override
public boolean equals(final Object obj) {
- if (obj == this) {
- return true;
- }
- return obj instanceof ListenerKey<?> && listener == ((ListenerKey<?>) obj).listener;
+ return obj == this || obj instanceof ListenerKey<?> && listener == ((ListenerKey<?>) obj).listener;
}
@Override
* listener.
*/
private class NotificationTask implements Runnable {
-
private final Lock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
- private final ListenerKey<L> listenerKey;
+ private final @NonNull ListenerKey<L> listenerKey;
@GuardedBy("lock")
private final Queue<N> queue = new ArrayDeque<>();
@GuardedBy("lock")
private boolean exiting;
- NotificationTask(final ListenerKey<L> listenerKey, final Iterator<N> notifications) {
+ NotificationTask(final @NonNull ListenerKey<L> listenerKey, final @NonNull Iterator<N> notifications) {
this.listenerKey = requireNonNull(listenerKey);
while (notifications.hasNext()) {
queue.add(notifications.next());
}
}
- Iterator<N> recoverItems() {
+ @NonNull Iterator<N> recoverItems() {
// This violates @GuardedBy annotation, but is invoked only when the task is not started and will never
// get started, hence this is safe.
return queue.iterator();
}
}
- boolean submitNotifications(final Iterator<N> notifications) throws InterruptedException {
+ boolean submitNotifications(final @NonNull Iterator<N> notifications) throws InterruptedException {
final long start = System.nanoTime();
final long deadline = start + GIVE_UP_NANOS;
try {
// Loop until we've dispatched all the notifications in the queue.
while (true) {
- final Collection<N> notifications;
+ final @NonNull Collection<N> notifications;
lock.lock();
try {
}
@SuppressWarnings("checkstyle:illegalCatch")
- private void invokeListener(final Collection<N> notifications) {
+ private void invokeListener(final @NonNull Collection<N> notifications) {
LOG.debug("{}: Invoking listener {} with notification: {}", name, listenerKey, notifications);
try {
listenerInvoker.invokeListener(listenerKey.getListener(), notifications);