From ead44a5baba1149998c1bbc369f1ddbd1e8dc7a5 Mon Sep 17 00:00:00 2001 From: tpantelis Date: Sat, 26 Jul 2014 20:11:33 -0400 Subject: [PATCH] Bug 1430: New common/util concurrent classes AsyncNotifyingListeningExecutorService: A ListeningExecutorService that executes ListenableFuture callback Runnables on another executor to off-load them. AsyncNotifyingListenableFutureTask Listenable FutureTask used by AsyncNotifyingListeningExecutorService. QueuedNotificationManager Manages queuing and dispatching notifications for multiple listeners concurrently. Notifications are queued on a per-listener basis and dispatched serially to each listener via an Executor. FastThreadPoolExecutor Executor with a bounded queue that provides fast task execution time by favoring creating threads over queuing. CachedThreadPoolExecutor Executor with a bounded queue that favors reusing existing idle threads over creating new threads at the expense of slower task execution time. SpecialExecutors Factory methods for creating Fast/CachedThreadPoolExecutor instances. Change-Id: Ie0d2f4d4e0561976c6956d551e386da78631aafa Signed-off-by: tpantelis --- common/util/pom.xml | 11 +- .../yangtools/util/ExecutorServiceUtil.java | 36 +- .../yangtools/util/PropertyUtils.java | 51 +++ .../AsyncNotifyingListenableFutureTask.java | 154 +++++++ ...syncNotifyingListeningExecutorService.java | 150 +++++++ .../concurrent/CachedThreadPoolExecutor.java | 229 +++++++++++ ...lockDetectingListeningExecutorService.java | 91 ++-- .../concurrent/FastThreadPoolExecutor.java | 104 +++++ .../util/concurrent/NotificationManager.java | 47 +++ .../concurrent/QueuedNotificationManager.java | 389 ++++++++++++++++++ .../util/concurrent/SpecialExecutors.java | 154 +++++++ ...NotifyingListeningExecutorServiceTest.java | 200 +++++++++ .../util/concurrent/CommonTestUtils.java | 74 ++++ ...DetectingListeningExecutorServiceTest.java | 92 +++-- .../QueuedNotificationManagerTest.java | 301 ++++++++++++++ .../concurrent/ThreadPoolExecutorTest.java | 205 +++++++++ 16 files changed, 2185 insertions(+), 103 deletions(-) create mode 100644 common/util/src/main/java/org/opendaylight/yangtools/util/PropertyUtils.java create mode 100644 common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/AsyncNotifyingListenableFutureTask.java create mode 100644 common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/AsyncNotifyingListeningExecutorService.java create mode 100644 common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/CachedThreadPoolExecutor.java create mode 100644 common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/FastThreadPoolExecutor.java create mode 100644 common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/NotificationManager.java create mode 100644 common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManager.java create mode 100644 common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/SpecialExecutors.java create mode 100644 common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/AsyncNotifyingListeningExecutorServiceTest.java create mode 100644 common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/CommonTestUtils.java create mode 100644 common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManagerTest.java create mode 100644 common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/ThreadPoolExecutorTest.java diff --git a/common/util/pom.xml b/common/util/pom.xml index d0ab7fc85e..5d185efe51 100644 --- a/common/util/pom.xml +++ b/common/util/pom.xml @@ -24,7 +24,10 @@ org.opendaylight.yangtools concepts - + + com.google.code.findbugs + jsr305 + org.slf4j slf4j-api @@ -43,6 +46,12 @@ junit test + + org.opendaylight.yangtools + mockito-configuration + test + 0.6.2-SNAPSHOT + diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/ExecutorServiceUtil.java b/common/util/src/main/java/org/opendaylight/yangtools/util/ExecutorServiceUtil.java index 36f729fb8f..51a8d16f84 100644 --- a/common/util/src/main/java/org/opendaylight/yangtools/util/ExecutorServiceUtil.java +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/ExecutorServiceUtil.java @@ -12,6 +12,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,11 +26,15 @@ public final class ExecutorServiceUtil { private static final class WaitInQueueExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) { + if( executor.isShutdown() ) { + throw new RejectedExecutionException( "Executor has been shutdown." ); + } + try { executor.getQueue().put(r); } catch (InterruptedException e) { - LOG.debug("Intterupted while waiting for queue", e); - throw new RejectedExecutionException("Interrupted while waiting for queue", e); + LOG.debug("Interrupted while attempting to put to the queue", e); + throw new RejectedExecutionException("Interrupted while attempting to put to the queue", e); } } } @@ -42,7 +47,7 @@ public final class ExecutorServiceUtil { } /** - * Create a {@link BlockingQueue} which does not allow for non-blocking addition to the queue. + * Creates a {@link BlockingQueue} which does not allow for non-blocking addition to the queue. * This is useful with {@link #waitInQueueExecutionHandler()} to turn force a * {@link ThreadPoolExecutor} to create as many threads as it is configured to before starting * to fill the queue. @@ -50,7 +55,7 @@ public final class ExecutorServiceUtil { * @param delegate Backing blocking queue. * @return A new blocking queue backed by the delegate */ - public BlockingQueue offerFailingBlockingQueue(final BlockingQueue delegate) { + public static BlockingQueue offerFailingBlockingQueue(final BlockingQueue delegate) { return new ForwardingBlockingQueue() { @Override public boolean offer(final E o) { @@ -65,12 +70,31 @@ public final class ExecutorServiceUtil { } /** - * Return a {@link RejectedExecutionHandler} which blocks on the {@link ThreadPoolExecutor}'s + * Returns a {@link RejectedExecutionHandler} which blocks on the {@link ThreadPoolExecutor}'s * backing queue if a new thread cannot be spawned. * * @return A shared RejectedExecutionHandler instance. */ - public RejectedExecutionHandler waitInQueueExecutionHandler() { + public static RejectedExecutionHandler waitInQueueExecutionHandler() { return WAIT_IN_QUEUE_HANDLER; } + + /** + * Tries to shutdown the given executor gracefully by awaiting termination for the given + * timeout period. If the timeout elapses before termination, the executor is forcefully + * shutdown. + */ + public static void tryGracefulShutdown(final ExecutorService executor, long timeout, + TimeUnit unit ) { + + executor.shutdown(); + + try { + if(!executor.awaitTermination(timeout, unit)) { + executor.shutdownNow(); + } + } catch( InterruptedException e ) { + executor.shutdownNow(); + } + } } diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/PropertyUtils.java b/common/util/src/main/java/org/opendaylight/yangtools/util/PropertyUtils.java new file mode 100644 index 0000000000..cc8f94d182 --- /dev/null +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/PropertyUtils.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.yangtools.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Strings; + +/** + * Provides utilities for system properties. + * + * @author Thomas Pantelis + */ +public final class PropertyUtils { + + private static final Logger LOG = LoggerFactory.getLogger(PropertyUtils.class); + + private PropertyUtils() { + } + + /** + * Obtains the given property from the System properties and returns as an int. If the property + * is not found the specified default value is returned. If the property value can't be parsed + * to an int, a warning is logged and the default value is returned. + * + * @param propName the name of the property to get + * @param defaultValue the default value + * @return the System property as an int or the defaultValue if not found. + */ + public static int getIntSystemProperty( String propName, int defaultValue ) { + int propValue = defaultValue; + String strValue = System.getProperty(propName); + if (!Strings.isNullOrEmpty(strValue) && !strValue.trim().isEmpty() ) { + try { + propValue = Integer.parseInt(strValue); + } catch (NumberFormatException e) { + LOG.warn("Cannot parse value {} for system property {}, using default {}", + strValue, propName, defaultValue); + } + } + + return propValue; + } +} diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/AsyncNotifyingListenableFutureTask.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/AsyncNotifyingListenableFutureTask.java new file mode 100644 index 0000000000..69c94f32a3 --- /dev/null +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/AsyncNotifyingListenableFutureTask.java @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.yangtools.util.concurrent; + +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.FutureTask; + +import javax.annotation.Nullable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.ExecutionList; +import com.google.common.util.concurrent.ListenableFuture; + +/** + * A {@link FutureTask} that also implements the {@link ListenableFuture} interface similar to + * guava's {@link ListenableFutureTask}. This class differs from ListenableFutureTask in that it + * allows an {@link Executor} to be specified on construction that is used to execute listener + * callback Runnables, registered via {@link #addListener}, asynchronously when this task completes. + * This is useful when you want to guarantee listener executions are off-loaded onto another thread + * to avoid blocking the thread that completed this task, as a common use case is to pass an + * executor that runs tasks in the same thread as the caller (ie MoreExecutors#sameThreadExecutor) + * to {@link #addListener}. + *

+ * Note: the Executor specified on construction does not replace the Executor specified in + * {@link #addListener}. The latter Executor is still used however, if it is detected that the + * listener Runnable would execute in the thread that completed this task, the listener + * is executed on Executor specified on construction. + * + * @author Thomas Pantelis + * + * @param the Future result value type + */ +public class AsyncNotifyingListenableFutureTask extends FutureTask implements ListenableFuture { + + private static final Logger LOG = LoggerFactory.getLogger( AsyncNotifyingListenableFutureTask.class ); + + /** + * ThreadLocal used to detect if the task completion thread is running the listeners. + */ + private static final ThreadLocal ON_TASK_COMPLETION_THREAD_TL = new ThreadLocal<>(); + + /** + * The execution list to hold our listeners. + */ + private final ExecutionList executionList = new ExecutionList(); + + /** + * The executor used to run listener callbacks. + */ + private final Executor listenerExecutor; + + private AsyncNotifyingListenableFutureTask( Callable callable, @Nullable Executor listenerExecutor ) { + super( callable ); + this.listenerExecutor = listenerExecutor; + } + + private AsyncNotifyingListenableFutureTask( Runnable runnable, @Nullable V result, + @Nullable Executor listenerExecutor ) { + super( runnable, result ); + this.listenerExecutor = listenerExecutor; + } + + /** + * Creates an {@code AsyncListenableFutureTask} that will upon running, execute the given + * {@code Callable}. + * + * @param callable the callable task + * @param listenerExecutor the executor used to run listener callbacks asynchronously. + * If null, no executor is used. + */ + public static AsyncNotifyingListenableFutureTask create( Callable callable, + @Nullable Executor listenerExecutor ) { + return new AsyncNotifyingListenableFutureTask( callable, listenerExecutor ); + } + + /** + * Creates a {@code AsyncListenableFutureTask} that will upon running, execute the + * given {@code Runnable}, and arrange that {@code get} will return the + * given result on successful completion. + * + * @param runnable the runnable task + * @param result the result to return on successful completion. + * @param listenerExecutor the executor used to run listener callbacks asynchronously. + * If null, no executor is used. + */ + public static AsyncNotifyingListenableFutureTask create( Runnable runnable, @Nullable V result, + @Nullable Executor listenerExecutor ) { + return new AsyncNotifyingListenableFutureTask( runnable, result, listenerExecutor ); + } + + @Override + public void addListener( Runnable listener, Executor executor ) { + // If a listenerExecutor was specified on construction, wrap the listener Runnable in a + // DelegatingRunnable. If the specified executor is one that runs tasks in the same thread + // as the caller submitting the task (eg MoreExecutors#sameThreadExecutor) and the + // listener is executed from the #done method, then the DelegatingRunnable will detect this + // via the ThreadLocal and submit the listener Runnable to the listenerExecutor. + // + // On the other hand, if this task is already complete, the call to ExecutionList#add below + // will execute the listener Runnable immediately and, since the ThreadLocal won't be set, + // the DelegatingRunnable will run the listener Runnable inline. + + executionList.add( listenerExecutor == null ? listener : + new DelegatingRunnable( listener, listenerExecutor ), executor ); + } + + /** + * Called by the base class when the future result is set. We invoke our listeners. + */ + @Override + protected void done() { + ON_TASK_COMPLETION_THREAD_TL.set( Boolean.TRUE ); + try { + executionList.execute(); + } finally { + ON_TASK_COMPLETION_THREAD_TL.remove(); + } + } + + private static class DelegatingRunnable implements Runnable { + + private final Runnable delegate; + private final Executor executor; + + DelegatingRunnable( Runnable delegate, Executor executor ) { + this.delegate = delegate; + this.executor = executor; + } + + @Override + public void run() { + if( ON_TASK_COMPLETION_THREAD_TL.get() == null ) { + // We're not running on the task completion thread so run the delegate inline. + LOG.trace( "Executing ListenenableFuture Runnable on this thread: {}", + Thread.currentThread().getName() ); + delegate.run(); + } else { + // We're running on the task completion thread so off-load to the executor. + LOG.trace( "Submitting ListenenableFuture Runnable to the listenerExecutor", + Thread.currentThread().getName() ); + executor.execute( delegate ); + } + } + } +} diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/AsyncNotifyingListeningExecutorService.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/AsyncNotifyingListeningExecutorService.java new file mode 100644 index 0000000000..ef4670d9c5 --- /dev/null +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/AsyncNotifyingListeningExecutorService.java @@ -0,0 +1,150 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.yangtools.util.concurrent; + +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import javax.annotation.Nullable; + +import com.google.common.base.Objects; +import com.google.common.base.Objects.ToStringHelper; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.AbstractListeningExecutorService; +import com.google.common.util.concurrent.ListenableFuture; + +/** + * An {@link ListeningExecutorService} implementation that also allows for an {@link Executor} to be + * specified on construction that is used to execute {@link ListenableFuture} callback Runnables, + * registered via {@link Futures#addCallback} or {@link ListenableFuture#addListener} directly, + * asynchronously when a task that is run on this executor completes. This is useful when you want + * to guarantee listener callback executions are off-loaded onto another thread to avoid blocking + * the thread that completed the task, as a common use case is to pass an executor that runs tasks + * in the same thread as the caller (ie MoreExecutors#sameThreadExecutor}) to + * {@link ListenableFuture#addListener}. + *

+ * Most commonly, this class would be used in lieu of MoreExecutors#listeningDecorator + * when the underlying delegate Executor is single-threaded, in which case, you may not want + * ListenableFuture callbacks to block the single thread. + *

+ * Note: the Executor specified on construction does not replace the Executor specified in + * {@link ListenableFuture#addListener}. The latter Executor is still used however, if it is + * detected that the listener Runnable would execute in the thread that completed the task, the + * listener is executed on Executor specified on construction. + * + * @author Thomas Pantelis + * @see AsyncNotifyingListenableFutureTask + */ +public class AsyncNotifyingListeningExecutorService extends AbstractListeningExecutorService { + + private final ExecutorService delegate; + private final Executor listenableFutureExecutor; + + /** + * Constructor. + * + * @param delegate the back-end ExecutorService. + * @param listenableFutureExecutor the executor used to run listener callbacks asynchronously. + * If null, no executor is used. + */ + public AsyncNotifyingListeningExecutorService( ExecutorService delegate, + @Nullable Executor listenableFutureExecutor ) { + this.delegate = Preconditions.checkNotNull( delegate ); + this.listenableFutureExecutor = listenableFutureExecutor; + } + + /** + * Creates an {@link AsyncNotifyingListenableFutureTask} instance with the listener Executor. + * + * @param task the Callable to execute + */ + private AsyncNotifyingListenableFutureTask newFutureTask( Callable task ) { + return AsyncNotifyingListenableFutureTask.create( task, listenableFutureExecutor ); + } + + /** + * Creates an {@link AsyncNotifyingListenableFutureTask} instance with the listener Executor. + * + * @param task the Runnable to execute + */ + private AsyncNotifyingListenableFutureTask newFutureTask( Runnable task, T result ) { + return AsyncNotifyingListenableFutureTask.create( task, result, listenableFutureExecutor ); + } + + /** + * Returns the delegate ExecutorService. + */ + protected ExecutorService getDelegate() { + return delegate; + } + + @Override + public boolean awaitTermination( long timeout, TimeUnit unit ) throws InterruptedException { + return delegate.awaitTermination( timeout, unit ); + } + + @Override + public boolean isShutdown() { + return delegate.isShutdown(); + } + + @Override + public boolean isTerminated() { + return delegate.isTerminated(); + } + + @Override + public void shutdown() { + delegate.shutdown(); + } + + @Override + public List shutdownNow() { + return delegate.shutdownNow(); + } + + @Override + public void execute( Runnable command ) { + delegate.execute( command ); + } + + @Override + public ListenableFuture submit( Callable task ) { + AsyncNotifyingListenableFutureTask futureTask = newFutureTask( task ); + delegate.execute( futureTask ); + return futureTask; + } + + @Override + public ListenableFuture submit( Runnable task ) { + AsyncNotifyingListenableFutureTask futureTask = newFutureTask( task, null ); + delegate.execute( futureTask ); + return futureTask; + } + + @Override + public ListenableFuture submit( Runnable task, T result ) { + AsyncNotifyingListenableFutureTask futureTask = newFutureTask( task, result ); + delegate.execute( futureTask ); + return futureTask; + } + + protected ToStringHelper addToStringAttributes( ToStringHelper toStringHelper ) { + return toStringHelper; + } + + @Override + public final String toString(){ + return addToStringAttributes( Objects.toStringHelper( this ) + .add( "delegate", delegate ) ).toString(); + } +} diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/CachedThreadPoolExecutor.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/CachedThreadPoolExecutor.java new file mode 100644 index 0000000000..4936efaba1 --- /dev/null +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/CachedThreadPoolExecutor.java @@ -0,0 +1,229 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.yangtools.util.concurrent; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.base.Objects; +import com.google.common.base.Objects.ToStringHelper; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * A ThreadPoolExecutor with a specified bounded queue capacity that favors reusing previously + * constructed threads, when they are available, over creating new threads. + *

+ * See {@link SpecialExecutors#newBoundedCachedThreadPool} for more details. + * + * @author Thomas Pantelis + */ +public class CachedThreadPoolExecutor extends ThreadPoolExecutor { + + private static final long IDLE_TIMEOUT_IN_SEC = 60L; + + private final AtomicLong largestBackingQueueSize = new AtomicLong( 0 ); + + private final ExecutorQueue executorQueue; + + private final String threadPrefix; + + private final int maximumQueueSize; + + private final RejectedTaskHandler rejectedTaskHandler; + + /** + * Constructs an instance. + * + * @param maximumPoolSize + * the maximum number of threads to allow in the pool. Threads will terminate after + * being idle for 60 seconds. + * @param maximumQueueSize + * the capacity of the queue. + * @param threadPrefix + * the name prefix for threads created by this executor. + */ + public CachedThreadPoolExecutor( int maximumPoolSize, int maximumQueueSize, String threadPrefix ) { + // We're using a custom SynchronousQueue that has a backing bounded LinkedBlockingQueue. + // We don't specify any core threads (first parameter) so, when a task is submitted, + // the base class will always try to offer to the queue. If there is an existing waiting + // thread, the offer will succeed and the task will be handed to the thread to execute. If + // there's no waiting thread, either because there are no threads in the pool or all threads + // are busy, the base class will try to create a new thread. If the maximum thread limit has + // been reached, the task will be rejected. We specify a RejectedTaskHandler that tries + // to offer to the backing queue. If that succeeds, the task will execute as soon as a + // thread becomes available. If the offer fails to the backing queue, the task is rejected. + super( 0, maximumPoolSize, IDLE_TIMEOUT_IN_SEC, TimeUnit.SECONDS, + new ExecutorQueue( maximumQueueSize ) ); + + this.threadPrefix = Preconditions.checkNotNull( threadPrefix ); + this.maximumQueueSize = maximumQueueSize; + + setThreadFactory( new ThreadFactoryBuilder().setDaemon( true ) + .setNameFormat( this.threadPrefix + "-%d" ).build() ); + + executorQueue = (ExecutorQueue)super.getQueue(); + + rejectedTaskHandler = new RejectedTaskHandler( + executorQueue.getBackingQueue(), largestBackingQueueSize ); + super.setRejectedExecutionHandler( rejectedTaskHandler ); + } + + @Override + public void setRejectedExecutionHandler( RejectedExecutionHandler handler ) { + rejectedTaskHandler.setDelegateRejectedExecutionHandler( handler ); + } + + @Override + public BlockingQueue getQueue(){ + return executorQueue.getBackingQueue(); + } + + public long getLargestQueueSize() { + return largestBackingQueueSize.get(); + } + + protected ToStringHelper addToStringAttributes( ToStringHelper toStringHelper ) { + return toStringHelper; + } + + @Override + public final String toString() { + return addToStringAttributes( Objects.toStringHelper( this ) + .add( "Thread Prefix", threadPrefix ) + .add( "Current Thread Pool Size", getPoolSize() ) + .add( "Largest Thread Pool Size", getLargestPoolSize() ) + .add( "Max Thread Pool Size", getMaximumPoolSize() ) + .add( "Current Queue Size", executorQueue.getBackingQueue().size() ) + .add( "Largest Queue Size", getLargestQueueSize() ) + .add( "Max Queue Size", maximumQueueSize ) + .add( "Active Thread Count", getActiveCount() ) + .add( "Completed Task Count", getCompletedTaskCount() ) + .add( "Total Task Count", getTaskCount() ) ).toString(); + } + + /** + * A customized SynchronousQueue that has a backing bounded LinkedBlockingQueue. This class + * overrides the #poll methods to first try to poll the backing queue for a task. If the backing + * queue is empty, it calls the base SynchronousQueue#poll method. In this manner, we get the + * thread reuse behavior of the SynchronousQueue with the added ability to queue tasks when all + * threads are busy. + */ + private static class ExecutorQueue extends SynchronousQueue { + + private static final long serialVersionUID = 1L; + + private static final long POLL_WAIT_TIME_IN_MS = 300; + + private final LinkedBlockingQueue backingQueue; + + ExecutorQueue( int maxBackingQueueSize ) { + backingQueue = new LinkedBlockingQueue<>( maxBackingQueueSize ); + } + + LinkedBlockingQueue getBackingQueue() { + return backingQueue; + } + + @Override + public Runnable poll( long timeout, TimeUnit unit ) throws InterruptedException { + long totalWaitTime = unit.toMillis( timeout ); + long waitTime = Math.min( totalWaitTime, POLL_WAIT_TIME_IN_MS ); + Runnable task = null; + + // We loop here, each time polling the backingQueue first then our queue, instead of + // polling each just once. This is to handle the following timing edge case: + // + // We poll the backingQueue and it's empty but, before the call to super.poll, + // a task is offered but no thread is immediately available and the task is put on the + // backingQueue. There is a slight chance that all the other threads could be at the + // same point, in which case they would all call super.poll and wait. If we only + // called poll once, no thread would execute the task (unless/until another task was + // later submitted). But by looping and breaking the specified timeout into small + // periods, one thread will eventually wake up and get the task from the backingQueue + // and execute it, although slightly delayed. + + while( task == null ) { + // First try to get a task from the backing queue. + task = backingQueue.poll(); + if( task == null ) { + // No task in backing - call the base class to wait for one to be offered. + task = super.poll( waitTime, TimeUnit.MILLISECONDS ); + + totalWaitTime -= POLL_WAIT_TIME_IN_MS; + if( totalWaitTime <= 0 ) { + break; + } + + waitTime = Math.min( totalWaitTime, POLL_WAIT_TIME_IN_MS ); + } + } + + return task; + } + + @Override + public Runnable poll() { + Runnable task = backingQueue.poll(); + return task != null ? task : super.poll(); + } + } + + /** + * Internal RejectedExecutionHandler that tries to offer rejected tasks to the backing queue. + * If the queue is full, we throw a RejectedExecutionException by default. The client can + * override this behavior be specifying their own RejectedExecutionHandler, in which case we + * delegate to that handler. + */ + private static class RejectedTaskHandler implements RejectedExecutionHandler { + + private final LinkedBlockingQueue backingQueue; + private final AtomicLong largestBackingQueueSize; + private volatile RejectedExecutionHandler delegateRejectedExecutionHandler; + + RejectedTaskHandler( LinkedBlockingQueue backingQueue, + AtomicLong largestBackingQueueSize ) { + this.backingQueue = backingQueue; + this.largestBackingQueueSize = largestBackingQueueSize; + } + + void setDelegateRejectedExecutionHandler( + RejectedExecutionHandler delegateRejectedExecutionHandler ){ + this.delegateRejectedExecutionHandler = delegateRejectedExecutionHandler; + } + + @Override + public void rejectedExecution( Runnable task, ThreadPoolExecutor executor ) { + if( executor.isShutdown() ) { + throw new RejectedExecutionException( "Executor has been shutdown." ); + } + + if( !backingQueue.offer( task ) ) { + if( delegateRejectedExecutionHandler != null ) { + delegateRejectedExecutionHandler.rejectedExecution( task, executor ); + } else { + throw new RejectedExecutionException( + "All threads are in use and the queue is full" ); + } + } + + largestBackingQueueSize.incrementAndGet(); + long size = backingQueue.size(); + long largest = largestBackingQueueSize.get(); + if( size > largest ) { + largestBackingQueueSize.compareAndSet( largest, size ); + } + } + } +} diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/DeadlockDetectingListeningExecutorService.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/DeadlockDetectingListeningExecutorService.java index 39332be85e..011872d6b1 100644 --- a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/DeadlockDetectingListeningExecutorService.java +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/DeadlockDetectingListeningExecutorService.java @@ -11,18 +11,17 @@ package org.opendaylight.yangtools.util.concurrent; import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.base.Function; -import com.google.common.util.concurrent.AbstractListeningExecutorService; import com.google.common.util.concurrent.ForwardingListenableFuture; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListenableFutureTask; - -import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import javax.annotation.Nullable; + /** * An implementation of ListeningExecutorService that attempts to detect deadlock scenarios that * could occur if clients invoke the returned Future's get methods synchronously. @@ -45,10 +44,9 @@ import java.util.concurrent.TimeoutException; * * @author Thomas Pantelis */ -public class DeadlockDetectingListeningExecutorService extends AbstractListeningExecutorService { +public class DeadlockDetectingListeningExecutorService extends AsyncNotifyingListeningExecutorService { private final ThreadLocal deadlockDetector = new ThreadLocal<>(); private final Function deadlockExceptionFunction; - private final ExecutorService delegate; /** * Constructor. @@ -57,61 +55,45 @@ public class DeadlockDetectingListeningExecutorService extends AbstractListening * @param deadlockExceptionFunction Function that returns an Exception instance to set as the * cause of the ExecutionException when a deadlock is detected. */ - public DeadlockDetectingListeningExecutorService(final ExecutorService delegate, - final Function deadlockExceptionFunction) { - this.delegate = checkNotNull(delegate); - this.deadlockExceptionFunction = checkNotNull(deadlockExceptionFunction); - } - - @Override - public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException { - return delegate.awaitTermination(timeout, unit); - } - - @Override - public boolean isShutdown() { - return delegate.isShutdown(); - } - - @Override - public boolean isTerminated() { - return delegate.isTerminated(); + public DeadlockDetectingListeningExecutorService( ExecutorService delegate, + Function deadlockExceptionFunction ) { + this(delegate, deadlockExceptionFunction, null); } - @Override - public void shutdown() { - delegate.shutdown(); - } - - @Override - public List shutdownNow() { - return delegate.shutdownNow(); + /** + * Constructor. + * + * @param delegate the backing ExecutorService. + * @param deadlockExceptionFunction Function that returns an Exception instance to set as the + * cause of the ExecutionException when a deadlock is detected. + * @param listenableFutureExecutor the executor used to run listener callbacks asynchronously. + * If null, no executor is used. + */ + public DeadlockDetectingListeningExecutorService( ExecutorService delegate, + Function deadlockExceptionFunction, + @Nullable Executor listenableFutureExecutor ) { + super(delegate, listenableFutureExecutor); + this.deadlockExceptionFunction = checkNotNull(deadlockExceptionFunction); } @Override - public void execute(final Runnable command) { - delegate.execute(wrapRunnable(command)); + public void execute( Runnable command ){ + getDelegate().execute(wrapRunnable(command)); } @Override - public ListenableFuture submit(final Callable task ) { - final ListenableFutureTask futureTask = ListenableFutureTask.create(wrapCallable(task)); - delegate.execute(futureTask); - return wrapListenableFuture(futureTask); + public ListenableFuture submit( Callable task ){ + return wrapListenableFuture(super.submit(wrapCallable(task))); } @Override - public ListenableFuture submit( final Runnable task ) { - ListenableFutureTask futureTask = ListenableFutureTask.create(wrapRunnable(task), null); - delegate.execute(futureTask); - return wrapListenableFuture(futureTask); + public ListenableFuture submit( Runnable task ){ + return wrapListenableFuture(super.submit(wrapRunnable(task))); } @Override - public ListenableFuture submit(final Runnable task, final T result) { - ListenableFutureTask futureTask = ListenableFutureTask.create(wrapRunnable(task), result); - delegate.execute(futureTask); - return wrapListenableFuture(futureTask); + public ListenableFuture submit( Runnable task, T result ){ + return wrapListenableFuture(super.submit(wrapRunnable(task), result)); } private Runnable wrapRunnable(final Runnable task) { @@ -122,7 +104,7 @@ public class DeadlockDetectingListeningExecutorService extends AbstractListening try { task.run(); } finally { - deadlockDetector.set(null); + deadlockDetector.remove(); } } }; @@ -136,7 +118,7 @@ public class DeadlockDetectingListeningExecutorService extends AbstractListening try { return delagate.call(); } finally { - deadlockDetector.set(null); + deadlockDetector.remove(); } } }; @@ -144,11 +126,12 @@ public class DeadlockDetectingListeningExecutorService extends AbstractListening private ListenableFuture wrapListenableFuture(final ListenableFuture delegate ) { /* - * This creates a forwarding Future that overrides calls to get(...) to check, via the ThreadLocal, - * if the caller is doing a blocking call on a thread from this executor. If so, we detect this as - * a deadlock and throw an ExecutionException even though it may not be a deadlock if there are - * more than 1 thread in the pool. Either way, there's bad practice somewhere, either on the client - * side for doing a blocking call or in the framework's threading model. + * This creates a forwarding Future that overrides calls to get(...) to check, via the + * ThreadLocal, if the caller is doing a blocking call on a thread from this executor. If + * so, we detect this as a deadlock and throw an ExecutionException even though it may not + * be a deadlock if there are more than 1 thread in the pool. Either way, there's bad + * practice somewhere, either on the client side for doing a blocking call or in the + * framework's threading model. */ return new ForwardingListenableFuture.SimpleForwardingListenableFuture(delegate) { @Override diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/FastThreadPoolExecutor.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/FastThreadPoolExecutor.java new file mode 100644 index 0000000000..b7549eb24e --- /dev/null +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/FastThreadPoolExecutor.java @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.yangtools.util.concurrent; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import com.google.common.base.Objects; +import com.google.common.base.Objects.ToStringHelper; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * A ThreadPoolExecutor with a specified bounded queue capacity that favors creating new threads + * over queuing, as the former is faster. + *

+ * See {@link SpecialExecutors#newFastBlockingThreadPool} for more details. + * + * @author Thomas Pantelis + */ +public class FastThreadPoolExecutor extends ThreadPoolExecutor { + + private static final long DEFAULT_IDLE_TIMEOUT_IN_SEC = 15L; + + private final String threadPrefix; + private final int maximumQueueSize; + + /** + * Constructs a FastThreadPoolExecutor instance. + * + * @param maximumPoolSize + * the maximum number of threads to allow in the pool. Threads will terminate after + * being idle for 15 seconds. + * @param maximumQueueSize + * the capacity of the queue. + * @param threadPrefix + * the name prefix for threads created by this executor. + */ + public FastThreadPoolExecutor( int maximumPoolSize, int maximumQueueSize, String threadPrefix ) { + this( maximumPoolSize, maximumQueueSize, DEFAULT_IDLE_TIMEOUT_IN_SEC, TimeUnit.SECONDS, + threadPrefix ); + } + + /** + * Constructs a FastThreadPoolExecutor instance. + * + * @param maximumPoolSize + * the maximum number of threads to allow in the pool. + * @param maximumQueueSize + * the capacity of the queue. + * @param keepAliveTime + * the maximum time that idle threads will wait for new tasks before terminating. + * @param unit + * the time unit for the keepAliveTime argument + * @param threadPrefix + * the name prefix for threads created by this executor. + */ + public FastThreadPoolExecutor( int maximumPoolSize, int maximumQueueSize, long keepAliveTime, + TimeUnit unit, String threadPrefix ) { + // We use all core threads (the first 2 parameters below equal) so, when a task is submitted, + // if the thread limit hasn't been reached, a new thread will be spawned to execute + // the task even if there is an existing idle thread in the pool. This is faster than + // handing the task to an existing idle thread via the queue. Once the thread limit is + // reached, subsequent tasks will be queued. If the queue is full, tasks will be rejected. + + super( maximumPoolSize, maximumPoolSize, keepAliveTime, unit, + new LinkedBlockingQueue( maximumQueueSize ) ); + + this.threadPrefix = threadPrefix; + this.maximumQueueSize = maximumQueueSize; + + setThreadFactory( new ThreadFactoryBuilder().setDaemon( true ) + .setNameFormat( threadPrefix + "-%d" ).build() ); + + if( keepAliveTime > 0 ) { + // Need to specifically configure core threads to timeout. + allowCoreThreadTimeOut( true ); + } + } + + protected ToStringHelper addToStringAttributes( ToStringHelper toStringHelper ) { + return toStringHelper; + } + + @Override + public final String toString() { + return addToStringAttributes( Objects.toStringHelper( this ) + .add( "Thread Prefix", threadPrefix ) + .add( "Current Thread Pool Size", getPoolSize() ) + .add( "Largest Thread Pool Size", getLargestPoolSize() ) + .add( "Max Thread Pool Size", getMaximumPoolSize() ) + .add( "Current Queue Size", getQueue().size() ) + .add( "Max Queue Size", maximumQueueSize ) + .add( "Active Thread Count", getActiveCount() ) + .add( "Completed Task Count", getCompletedTaskCount() ) + .add( "Total Task Count", getTaskCount() ) ).toString(); + } +} diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/NotificationManager.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/NotificationManager.java new file mode 100644 index 0000000000..41cc7dcc0e --- /dev/null +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/NotificationManager.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.yangtools.util.concurrent; + +import java.util.concurrent.RejectedExecutionException; + +/** + * Interface for a class that manages queuing and dispatching notifications for multiple listeners. + * + * @author Thomas Pantelis + * + * @param the listener type + * @param the notification type + */ +public interface NotificationManager { + + /** + * Submits a notification to be queued and dispatched to the given listener. + *

+ * Note: This method may block if the listener queue is currently full. + * + * @param listener the listener to notify + * @param notification the notification to dispatch + * @throws RejectedExecutionException if the notification can't be queued for dispatching + */ + void submitNotification( L listener, N notification ) + throws RejectedExecutionException; + + /** + * Submits notifications to be queued and dispatched to the given listener. + *

+ * Note: This method may block if the listener queue is currently full. + * + * @param listener the listener to notify + * @param notifications the notifications to dispatch + * @throws RejectedExecutionException if a notification can't be queued for dispatching + */ + void submitNotifications( L listener, Iterable notifications ) + throws RejectedExecutionException; + +} \ No newline at end of file diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManager.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManager.java new file mode 100644 index 0000000000..472520d998 --- /dev/null +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManager.java @@ -0,0 +1,389 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.yangtools.util.concurrent; + +import java.util.Arrays; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import javax.annotation.concurrent.GuardedBy; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * This class manages queuing and dispatching notifications for multiple listeners concurrently. + * Notifications are queued on a per-listener basis and dispatched serially to each listener via an + * {@link Executor}. + *

+ * This class optimizes its memory footprint by only allocating and maintaining a queue and executor + * task for a listener when there are pending notifications. On the first notification(s), a queue + * is created and a task is submitted to the executor to dispatch the queue to the associated + * listener. Any subsequent notifications that occur before all previous notifications have been + * dispatched are appended to the existing queue. When all notifications have been dispatched, the + * queue and task are discarded. + * + * @author Thomas Pantelis + * + * @param the listener type + * @param the notification type + */ +public class QueuedNotificationManager implements NotificationManager { + + /** + * Interface implemented by clients that does the work of invoking listeners with notifications. + * + * @author Thomas Pantelis + * + * @param the listener type + * @param the notification type + */ + public interface Invoker { + + /** + * Called to invoke a listener with a notification. + * + * @param listener the listener to invoke + * @param notification the notification to send + */ + void invokeListener( L listener, N notification ); + } + + private static final Logger LOG = LoggerFactory.getLogger( QueuedNotificationManager.class ); + + private final Executor executor; + private final Invoker listenerInvoker; + + private final ConcurrentMap,NotificationTask> + listenerCache = new ConcurrentHashMap<>(); + + private final String name; + private final int maxQueueCapacity; + + /** + * Constructor. + * + * @param executor the {@link Executor} to use for notification tasks + * @param listenerInvoker the {@link Invoker} to use for invoking listeners + * @param maxQueueCapacity the capacity of each listener queue + * @param name the name of this instance for logging info + */ + public QueuedNotificationManager( Executor executor, Invoker listenerInvoker, + int maxQueueCapacity, String name ) { + this.executor = Preconditions.checkNotNull( executor ); + this.listenerInvoker = Preconditions.checkNotNull( listenerInvoker ); + Preconditions.checkArgument( maxQueueCapacity > 0, "maxQueueCapacity must be > 0 " ); + this.maxQueueCapacity = maxQueueCapacity; + this.name = Preconditions.checkNotNull( name ); + } + + /* (non-Javadoc) + * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#addNotification(L, N) + */ + @Override + public void submitNotification( final L listener, final N notification ) + throws RejectedExecutionException { + + if( notification == null ) { + return; + } + + submitNotifications( listener, Arrays.asList( notification ) ); + } + + /* (non-Javadoc) + * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#submitNotifications(L, java.util.Collection) + */ + @Override + public void submitNotifications( final L listener, final Iterable notifications ) + throws RejectedExecutionException { + + if( notifications == null || listener == null ) { + return; + } + + if( LOG.isTraceEnabled() ) { + LOG.trace( "{}: submitNotifications for listener {}: {}", + name, listener.getClass(), notifications ); + } + + ListenerKey key = new ListenerKey<>( listener ); + NotificationTask newNotificationTask = null; + + // Keep looping until we are either able to add a new NotificationTask or are able to + // add our notifications to an existing NotificationTask. Eventually one or the other + // will occur. + + try { + while( true ) { + NotificationTask existingTask = listenerCache.get( key ); + + if( existingTask == null || !existingTask.submitNotifications( notifications ) ) { + + // Either there's no existing task or we couldn't add our notifications to the + // existing one because it's in the process of exiting and removing itself from + // the cache. Either way try to put a new task in the cache. If we can't put + // then either the existing one is still there and hasn't removed itself quite + // yet or some other concurrent thread beat us to the put although this method + // shouldn't be called concurrently for the same listener as that would violate + // notification ordering. In any case loop back up and try again. + + if( newNotificationTask == null ) { + newNotificationTask = new NotificationTask( key, notifications ); + } + + existingTask = listenerCache.putIfAbsent( key, newNotificationTask ); + if( existingTask == null ) { + + // We were able to put our new task - now submit it to the executor and + // we're done. If it throws a RejectedxecutionException, let that propagate + // to the caller. + + LOG.debug( "{}: Submitting NotificationTask for listener {}", + name, listener.getClass() ); + + executor.execute( newNotificationTask ); + break; + } + } else { + + // We were able to add our notifications to an existing task so we're done. + + break; + } + } + } catch( InterruptedException e ) { + + // We were interrupted trying to offer to the listener's queue. Somebody's probably + // telling us to quit. + + LOG.debug( "{}: Interrupted trying to add to {} listener's queue", + name, listener.getClass() ); + } + + if( LOG.isTraceEnabled() ) { + LOG.trace( "{}: submitNotifications dine for listener {}", + name, listener.getClass() ); + } + } + + /** + * Used as the listenerCache map key. We key by listener reference identity hashCode/equals. + * Since we don't know anything about the listener class implementations and we're mixing + * multiple listener class instances in the same map, this avoids any potential issue with an + * equals implementation that just blindly casts the other Object to compare instead of checking + * for instanceof. + */ + private static class ListenerKey { + + private final L listener; + + public ListenerKey( L listener ) { + this.listener = listener; + } + + L getListener() { + return listener; + } + + @Override + public int hashCode() { + return System.identityHashCode( listener ); + } + + @Override + public boolean equals( Object obj ) { + ListenerKey other = (ListenerKey) obj; + return listener == other.listener; + } + } + + /** + * Executor task for a single listener that queues notifications and sends them serially to the + * listener. + */ + private class NotificationTask implements Runnable { + + private final BlockingQueue notificationQueue; + + private volatile boolean done = false; + + @GuardedBy("queuingLock") + private boolean queuedNotifications = false; + + private final Lock queuingLock = new ReentrantLock(); + + private final ListenerKey listenerKey; + + NotificationTask( ListenerKey listenerKey, Iterable notifications ) { + + this.listenerKey = listenerKey; + this.notificationQueue = new LinkedBlockingQueue<>( maxQueueCapacity ); + + for( N notification: notifications ) { + this.notificationQueue.add( notification ); + } + } + + boolean submitNotifications( Iterable notifications ) throws InterruptedException { + + queuingLock.lock(); + try { + + // Check the done flag - if true then #run is in the process of exiting so return + // false to indicate such. Otherwise, offer the notifications to the queue. + + if( done ) { + return false; + } + + for( N notification: notifications ) { + + while( true ) { + + // Try to offer for up to a minute and log a message if it times out. + + // FIXME: we loop forever to guarantee delivery however this leaves it open + // for 1 rogue listener to bring everyone to a halt. Another option is to + // limit the tries and give up after a while and drop the notification. + // Given a reasonably large queue capacity and long timeout, if we still + // can't queue then most likely the listener is an unrecoverable state + // (deadlock or endless loop). + + if( LOG.isDebugEnabled() ) { + LOG.debug( "{}: Offering notification to the queue for listener {}: {}", + name, listenerKey.getListener().getClass(), notification ); + } + + if( notificationQueue.offer( notification, 1, TimeUnit.MINUTES ) ) { + break; + } + + LOG.warn( + "{}: Timed out trying to offer a notification to the queue for listener {}." + + "The queue has reached its capacity of {}", + name, listenerKey.getListener().getClass(), maxQueueCapacity ); + } + } + + // Set the queuedNotifications flag to tell #run that we've just queued + // notifications and not to exit yet, even if it thinks the queue is empty at this + // point. + + queuedNotifications = true; + + } finally { + queuingLock.unlock(); + } + + return true; + } + + @Override + public void run() { + + try { + // Loop until we've dispatched all the notifications in the queue. + + while( true ) { + + // Get the notification at the head of the queue, waiting a little bit for one + // to get offered. + + N notification = notificationQueue.poll( 10, TimeUnit.MILLISECONDS ); + if( notification == null ) { + + // The queue is empty - try to get the queuingLock. If we can't get the lock + // then #submitNotifications is in the process of offering to the queue so + // we'll loop back up and poll the queue again. + + if( queuingLock.tryLock() ) { + try { + + // Check the queuedNotifications flag to see if #submitNotifications + // has offered new notification(s) to the queue. If so, loop back up + // and poll the queue again. Otherwise set done to true and exit. + // Once we set the done flag and unlock, calls to + // #submitNotifications will fail and a new task will be created. + + if( !queuedNotifications ) { + done = true; + break; + } + + // Clear the queuedNotifications flag so we'll try to exit the next + // time through the loop when the queue is empty. + + queuedNotifications = false; + + } finally { + queuingLock.unlock(); + } + } + } + + notifyListener( notification ); + } + } catch( InterruptedException e ) { + + // The executor is probably shutting down so log as debug. + LOG.debug( "{}: Interrupted trying to remove from {} listener's queue", + name, listenerKey.getListener().getClass() ); + } finally { + + // We're exiting, gracefully or not - either way make sure we always remove + // ourselves from the cache. + + listenerCache.remove( listenerKey ); + } + } + + private void notifyListener( N notification ) { + + if( notification == null ) { + return; + } + + try { + + if( LOG.isDebugEnabled() ) { + LOG.debug( "{}: Invoking listener {} with notification: {}", + name, listenerKey.getListener().getClass(), notification ); + } + + listenerInvoker.invokeListener( listenerKey.getListener(), notification ); + + } catch( RuntimeException e ) { + + // We'll let a RuntimeException from the listener slide and keep sending any + // remaining notifications. + + LOG.error( String.format( "%1$s: Error notifying listener %2$s", name, + listenerKey.getListener().getClass() ), e ); + + } catch( Error e ) { + + // A JVM Error is severe - best practice is to throw them up the chain. Set done to + // true so no new notifications can be added to this task as we're about to bail. + + done = true; + throw e; + } + } + } +} diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/SpecialExecutors.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/SpecialExecutors.java new file mode 100644 index 0000000000..0548d7a091 --- /dev/null +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/SpecialExecutors.java @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.yangtools.util.concurrent; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * Factory methods for creating {@link ExecutorService} instances with specific configurations. + + * @author Thomas Pantelis + */ +public final class SpecialExecutors { + + private SpecialExecutors() { + } + + /** + * Creates an ExecutorService with a specified bounded queue capacity that favors creating new + * threads over queuing, as the former is faster, so threads will only be reused when the thread + * limit is exceeded and tasks are queued. If the maximum queue capacity is reached, subsequent + * tasks will be rejected. + *

+ * For example, if the maximum number of threads is 100 and 100 short-lived tasks are submitted + * within say 10 seconds, then 100 threads will be created and used - previously constructed + * idle threads will not be reused. This provides the fastest execution of the 100 tasks at the + * expense of memory and thread resource overhead. Therefore it is advisable to specify a + * relatively small thread limit (probably no more than 50). + *

+ * Threads that have not been used for 15 seconds are terminated and removed from the pool. + * Thus, a pool that remains idle for long enough will not consume any resources. + *

+ * If you need an executor with less memory and thread resource overhead where slower execution + * time is acceptable, consider using {@link #newBoundedCachedThreadPool }. + * + * @param maximumPoolSize + * the maximum number of threads to allow in the pool. Threads will terminate after + * being idle for 15 seconds. + * @param maximumQueueSize + * the capacity of the queue. + * @param threadPrefix + * the name prefix for threads created by this executor. + * @return a new ExecutorService with the specified configuration. + */ + public static ExecutorService newBoundedFastThreadPool( int maximumPoolSize, + int maximumQueueSize, String threadPrefix ) { + return new FastThreadPoolExecutor( maximumPoolSize, maximumQueueSize, threadPrefix ); + } + + /** + * Creates an ExecutorService similar to {@link #newBoundedFastThreadPool } except that it + * handles rejected tasks by running them in the same thread as the caller. Therefore if the + * queue is full, the caller submitting the task will be blocked until the task completes. In + * this manner, tasks are never rejected. + * + * @param maximumPoolSize + * the maximum number of threads to allow in the pool. Threads will terminate after + * being idle for 15 seconds. + * @param maximumQueueSize + * the capacity of the queue. + * @param threadPrefix + * the name prefix for threads created by this executor. + * @return a new ExecutorService with the specified configuration. + */ + public static ExecutorService newBlockingBoundedFastThreadPool( int maximumPoolSize, + int maximumQueueSize, String threadPrefix ) { + + FastThreadPoolExecutor executor = + new FastThreadPoolExecutor( maximumPoolSize, maximumQueueSize, threadPrefix ); + executor.setRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy() ); + return executor; + } + + /** + * Creates an ExecutorService with a specified bounded queue capacity that favors reusing + * previously constructed threads, when they are available, over creating new threads. When a + * task is submitted, if no existing thread is available, a new thread will be created and added + * to the pool. If there is an existing idle thread available, the task will be handed to that + * thread to execute. If the specified maximum thread limit is reached, subsequent tasks will be + * queued and will execute as threads become available. If the maximum queue capacity is + * reached, subsequent tasks will be rejected. + *

+ * Threads that have not been used for sixty seconds are terminated and removed from the pool. + * Thus, a pool that remains idle for long enough will not consume any resources. + *

+ * By reusing threads when possible, this executor optimizes for reduced memory and thread + * resource overhead at the expense of execution time. + *

+ * If you need an executor with faster execution time where increased memory and thread resource + * overhead is acceptable, consider using {@link #newBoundedFastThreadPool }. + * + * @param maximumPoolSize + * the maximum number of threads to allow in the pool. Threads will terminate after + * being idle for 60 seconds. + * @param maximumQueueSize + * the capacity of the queue. + * @param threadPrefix + * the name prefix for threads created by this executor. + * @return a new ExecutorService with the specified configuration. + */ + public static ExecutorService newBoundedCachedThreadPool( int maximumPoolSize, + int maximumQueueSize, String threadPrefix ) { + return new CachedThreadPoolExecutor( maximumPoolSize, maximumQueueSize, threadPrefix ); + } + + /** + * Creates an ExecutorService similar to {@link #newBoundedCachedThreadPool } except that it + * handles rejected tasks by running them in the same thread as the caller. Therefore if the + * queue is full, the caller submitting the task will be blocked until the task completes. In + * this manner, tasks are never rejected. + * + * @param maximumPoolSize + * the maximum number of threads to allow in the pool. Threads will terminate after + * being idle for 60 seconds. + * @param maximumQueueSize + * the capacity of the queue. + * @param threadPrefix + * the name prefix for threads created by this executor. + * @return a new ExecutorService with the specified configuration. + */ + public static ExecutorService newBlockingBoundedCachedThreadPool( int maximumPoolSize, + int maximumQueueSize, String threadPrefix ) { + + CachedThreadPoolExecutor executor = + new CachedThreadPoolExecutor( maximumPoolSize, maximumQueueSize, threadPrefix ); + executor.setRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy() ); + return executor; + } + + /** + * Creates an ExecutorService that uses a single worker thread operating off a bounded queue + * with the specified capacity. Tasks are guaranteed to execute sequentially, and no more than + * one task will be active at any given time. If the maximum queue capacity is reached, + * subsequent tasks will be rejected. + * + * @param maximumQueueSize + * the capacity of the queue. + * @param threadPrefix + * the name prefix for the thread created by this executor. + * @return a new ExecutorService with the specified configuration. + */ + public static ExecutorService newBoundedSingleThreadExecutor( int maximumQueueSize, + String threadPrefix ) { + return new FastThreadPoolExecutor( 1, maximumQueueSize, Long.MAX_VALUE, TimeUnit.SECONDS, + threadPrefix ); + } +} diff --git a/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/AsyncNotifyingListeningExecutorServiceTest.java b/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/AsyncNotifyingListeningExecutorServiceTest.java new file mode 100644 index 0000000000..1a3bf0d4fd --- /dev/null +++ b/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/AsyncNotifyingListeningExecutorServiceTest.java @@ -0,0 +1,200 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.yangtools.util.concurrent; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.After; +import org.junit.Test; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; + +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.Invoker; +import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_CALLABLE; +import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE; +import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE_WITH_RESULT; + +/** + * Unit tests for AsyncNotifyingListeningExecutorService. + * + * @author Thomas Pantelis + */ +public class AsyncNotifyingListeningExecutorServiceTest { + + private ExecutorService listenerExecutor; + private AsyncNotifyingListeningExecutorService testExecutor; + + @After + public void tearDown() { + if( listenerExecutor != null ) { + listenerExecutor.shutdownNow(); + } + + if( testExecutor != null ) { + testExecutor.shutdownNow(); + } + } + + @Test + public void testListenerCallbackWithExecutor() throws InterruptedException { + + String listenerThreadPrefix = "ListenerThread"; + listenerExecutor = Executors.newFixedThreadPool( 3, + new ThreadFactoryBuilder().setNameFormat( listenerThreadPrefix + "-%d" ).build() ); + + testExecutor = new AsyncNotifyingListeningExecutorService( + Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setNameFormat( "SingleThread" ).build() ), + listenerExecutor ); + + testListenerCallback( testExecutor, SUBMIT_CALLABLE, listenerThreadPrefix ); + testListenerCallback( testExecutor, SUBMIT_RUNNABLE, listenerThreadPrefix ); + testListenerCallback( testExecutor, SUBMIT_RUNNABLE_WITH_RESULT, listenerThreadPrefix ); + } + + @Test + public void testListenerCallbackWithNoExecutor() throws InterruptedException { + + String listenerThreadPrefix = "SingleThread"; + testExecutor = new AsyncNotifyingListeningExecutorService( + Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setNameFormat( listenerThreadPrefix ).build() ), + null ); + + testListenerCallback( testExecutor, SUBMIT_CALLABLE, listenerThreadPrefix ); + testListenerCallback( testExecutor, SUBMIT_RUNNABLE, listenerThreadPrefix ); + testListenerCallback( testExecutor, SUBMIT_RUNNABLE_WITH_RESULT, listenerThreadPrefix ); + } + + static void testListenerCallback( AsyncNotifyingListeningExecutorService executor, + Invoker invoker, final String expListenerThreadPrefix ) throws InterruptedException { + + AtomicReference assertError = new AtomicReference<>(); + CountDownLatch futureNotifiedLatch = new CountDownLatch( 1 ); + CountDownLatch blockTaskLatch = new CountDownLatch( 1 ); + + // The blockTaskLatch is used to block the task from completing until we've added + // our listener to the Future. Otherwise, if the task completes quickly and the Future is + // set to done before we've added our listener, the call to ListenableFuture#addListener + // will immediately notify synchronously on this thread as Futures#addCallback defaults to + // a same thread executor. This would erroneously fail the test. + + ListenableFuture future = invoker.invokeExecutor( executor, blockTaskLatch ); + addCallback( future, futureNotifiedLatch, expListenerThreadPrefix, assertError ); + + // Now that we've added our listener, signal the latch to let the task complete. + + blockTaskLatch.countDown(); + + assertTrue( "ListenableFuture callback was not notified of onSuccess", + futureNotifiedLatch.await( 5, TimeUnit.SECONDS ) ); + + if( assertError.get() != null ) { + throw assertError.get(); + } + + // Add another listener - since the Future is already complete, we expect the listener to be + // notified inline on this thread when it's added. + + futureNotifiedLatch = new CountDownLatch( 1 ); + addCallback( future, futureNotifiedLatch, Thread.currentThread().getName(), assertError ); + + assertTrue( "ListenableFuture callback was not notified of onSuccess", + futureNotifiedLatch.await( 5, TimeUnit.SECONDS ) ); + + if( assertError.get() != null ) { + throw assertError.get(); + } + } + + static void addCallback( ListenableFuture future, + final CountDownLatch futureNotifiedLatch, + final String expListenerThreadPrefix, + final AtomicReference assertError ) { + + Futures.addCallback( future, new FutureCallback() { + @Override + public void onSuccess( Object result ) { + + try { + String theadName = Thread.currentThread().getName(); + assertTrue( "ListenableFuture callback was not notified on the listener executor." + + " Expected thread name prefix \"" + expListenerThreadPrefix + + "\". Actual thread name \"" + theadName + "\"", + theadName.startsWith( expListenerThreadPrefix ) ); + } catch( AssertionError e ) { + assertError.set( e ); + } finally { + futureNotifiedLatch.countDown(); + } + } + + @Override + public void onFailure( Throwable t ) { + // Shouldn't happen + t.printStackTrace(); + } + } ); + } + + @Test + public void testDelegatedMethods() throws InterruptedException { + + Runnable task = new Runnable() { + @Override + public void run(){ + } + }; + + List taskList = Lists.newArrayList(); + + ExecutorService mockDelegate = mock( ExecutorService.class ); + doNothing().when( mockDelegate ).execute( task ); + doNothing().when( mockDelegate ).shutdown(); + doReturn( taskList ).when( mockDelegate ).shutdownNow(); + doReturn( true ).when( mockDelegate ).awaitTermination( 3, TimeUnit.SECONDS ); + doReturn( true ).when( mockDelegate ).isShutdown(); + doReturn( true ).when( mockDelegate ).isTerminated(); + + AsyncNotifyingListeningExecutorService executor = new AsyncNotifyingListeningExecutorService( + mockDelegate, null ); + + executor.execute( task ); + executor.shutdown(); + assertEquals( "awaitTermination", true, executor.awaitTermination( 3, TimeUnit.SECONDS ) ); + assertSame( "shutdownNow", taskList, executor.shutdownNow() ); + assertEquals( "isShutdown", true, executor.isShutdown() ); + assertEquals( "isTerminated", true, executor.isTerminated() ); + + verify( mockDelegate ).execute( task ); + verify( mockDelegate ).shutdown(); + verify( mockDelegate ).awaitTermination( 3, TimeUnit.SECONDS ); + verify( mockDelegate ).shutdownNow(); + verify( mockDelegate ).isShutdown(); + verify( mockDelegate ).isTerminated(); + } +} diff --git a/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/CommonTestUtils.java b/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/CommonTestUtils.java new file mode 100644 index 0000000000..60c56a452c --- /dev/null +++ b/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/CommonTestUtils.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.yangtools.util.concurrent; + +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.Uninterruptibles; + +/** + * Some common test utilities. + * + * @author Thomas Pantelis + */ +public class CommonTestUtils { + + public interface Invoker { + ListenableFuture invokeExecutor( ListeningExecutorService executor, + CountDownLatch blockingLatch ); + }; + + public static final Invoker SUBMIT_CALLABLE = new Invoker() { + @Override + public ListenableFuture invokeExecutor( ListeningExecutorService executor, + final CountDownLatch blockingLatch ) { + return executor.submit( new Callable() { + @Override + public Void call() throws Exception { + if( blockingLatch != null ) { + Uninterruptibles.awaitUninterruptibly( blockingLatch ); + } + return null; + } + } ); + } + }; + + public static final Invoker SUBMIT_RUNNABLE = new Invoker() { + @Override + public ListenableFuture invokeExecutor( ListeningExecutorService executor, + final CountDownLatch blockingLatch ) { + return executor.submit( new Runnable() { + @Override + public void run() { + if( blockingLatch != null ) { + Uninterruptibles.awaitUninterruptibly( blockingLatch ); + } + } + } ); + } + }; + + public static final Invoker SUBMIT_RUNNABLE_WITH_RESULT = new Invoker() { + @Override + public ListenableFuture invokeExecutor( ListeningExecutorService executor, + final CountDownLatch blockingLatch ) { + return executor.submit( new Runnable() { + @Override + public void run() { + if( blockingLatch != null ) { + Uninterruptibles.awaitUninterruptibly( blockingLatch ); + } + } + }, "foo" ); + } + }; +} diff --git a/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/DeadlockDetectingListeningExecutorServiceTest.java b/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/DeadlockDetectingListeningExecutorServiceTest.java index b23750da98..6bba351dca 100644 --- a/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/DeadlockDetectingListeningExecutorServiceTest.java +++ b/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/DeadlockDetectingListeningExecutorServiceTest.java @@ -13,10 +13,12 @@ import static org.junit.Assert.*; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -25,6 +27,13 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import static org.opendaylight.yangtools.util.concurrent.AsyncNotifyingListeningExecutorServiceTest.testListenerCallback; +import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.Invoker; +import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_CALLABLE; +import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE; +import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE_WITH_RESULT; /** * Unit tests for DeadlockDetectingListeningExecutorService. @@ -33,44 +42,6 @@ import com.google.common.util.concurrent.ListeningExecutorService; */ public class DeadlockDetectingListeningExecutorServiceTest { - interface Invoker { - ListenableFuture invokeExecutor( ListeningExecutorService executor ); - }; - - static final Invoker SUBMIT_CALLABLE = new Invoker() { - @Override - public ListenableFuture invokeExecutor( ListeningExecutorService executor ) { - return executor.submit( new Callable() { - @Override - public String call() throws Exception{ - return "foo"; - } - } ); - } - }; - - static final Invoker SUBMIT_RUNNABLE = new Invoker() { - @Override - public ListenableFuture invokeExecutor( ListeningExecutorService executor ) { - return executor.submit( new Runnable() { - @Override - public void run(){ - } - } ); - } - }; - - static final Invoker SUBMIT_RUNNABLE_WITH_RESULT = new Invoker() { - @Override - public ListenableFuture invokeExecutor( ListeningExecutorService executor ) { - return executor.submit( new Runnable() { - @Override - public void run(){ - } - }, "foo" ); - } - }; - interface InitialInvoker { void invokeExecutor( ListeningExecutorService executor, Runnable task ); }; @@ -104,13 +75,25 @@ public class DeadlockDetectingListeningExecutorServiceTest { @Before public void setup() { - executor = new DeadlockDetectingListeningExecutorService( Executors.newSingleThreadExecutor(), - DEADLOCK_EXECUTOR_FUNCTION ); + } + + @After + public void tearDown() { + if( executor != null ) { + executor.shutdownNow(); + } + } + + DeadlockDetectingListeningExecutorService newExecutor() { + return new DeadlockDetectingListeningExecutorService( Executors.newSingleThreadExecutor(), + DEADLOCK_EXECUTOR_FUNCTION ); } @Test public void testBlockingSubmitOffExecutor() throws Exception { + executor = newExecutor(); + // Test submit with Callable. ListenableFuture future = executor.submit( new Callable() { @@ -144,6 +127,8 @@ public class DeadlockDetectingListeningExecutorServiceTest { @Test public void testNonBlockingSubmitOnExecutorThread() throws Throwable { + executor = newExecutor(); + testNonBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_CALLABLE ); testNonBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_RUNNABLE ); testNonBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_RUNNABLE_WITH_RESULT ); @@ -162,7 +147,7 @@ public class DeadlockDetectingListeningExecutorServiceTest { @Override public void run() { - Futures.addCallback( invoker.invokeExecutor( executor ), new FutureCallback() { + Futures.addCallback( invoker.invokeExecutor( executor, null ), new FutureCallback() { @Override public void onSuccess( Object result ) { futureCompletedLatch.countDown(); @@ -191,6 +176,8 @@ public class DeadlockDetectingListeningExecutorServiceTest { @Test public void testBlockingSubmitOnExecutorThread() throws Exception { + executor = newExecutor(); + testBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_CALLABLE ); testBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_RUNNABLE ); testBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_RUNNABLE_WITH_RESULT ); @@ -209,7 +196,7 @@ public class DeadlockDetectingListeningExecutorServiceTest { public void run() { try { - invoker.invokeExecutor( executor ).get(); + invoker.invokeExecutor( executor, null ).get(); } catch( ExecutionException e ) { caughtEx.set( e.getCause() ); } catch( Throwable e ) { @@ -229,4 +216,25 @@ public class DeadlockDetectingListeningExecutorServiceTest { assertNotNull( "Expected exception thrown", caughtEx.get() ); assertEquals( "Caught exception type", TestDeadlockException.class, caughtEx.get().getClass() ); } + + @Test + public void testListenableFutureCallbackWithExecutor() throws InterruptedException { + + String listenerThreadPrefix = "ListenerThread"; + ExecutorService listenerExecutor = Executors.newFixedThreadPool( 1, + new ThreadFactoryBuilder().setNameFormat( listenerThreadPrefix + "-%d" ).build() ); + + executor = new DeadlockDetectingListeningExecutorService( + Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setNameFormat( "SingleThread" ).build() ), + DEADLOCK_EXECUTOR_FUNCTION, listenerExecutor ); + + try { + testListenerCallback( executor, SUBMIT_CALLABLE, listenerThreadPrefix ); + testListenerCallback( executor, SUBMIT_RUNNABLE, listenerThreadPrefix ); + testListenerCallback( executor, SUBMIT_RUNNABLE_WITH_RESULT, listenerThreadPrefix ); + } finally { + listenerExecutor.shutdownNow(); + } + } } diff --git a/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManagerTest.java b/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManagerTest.java new file mode 100644 index 0000000000..d7e0e503fe --- /dev/null +++ b/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManagerTest.java @@ -0,0 +1,301 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.yangtools.util.concurrent; + +import static org.junit.Assert.fail; +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.junit.After; +import org.junit.Test; + +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Uninterruptibles; + +/** + * Unit tests for QueuedNotificationManager. + * + * @author Thomas Pantelis + */ +public class QueuedNotificationManagerTest { + + static class TestListener { + + private final List actual; + private volatile int expCount; + private volatile CountDownLatch latch; + volatile long sleepTime = 0; + volatile RuntimeException runtimeEx; + volatile Error jvmError; + boolean cacheNotifications = true; + String name; + + TestListener( int expCount, int id ) { + name = "TestListener " + id; + actual = Collections.synchronizedList( Lists.newArrayListWithCapacity( expCount ) ); + reset( expCount ); + } + + void reset( int expCount ) { + this.expCount = expCount; + latch = new CountDownLatch( expCount ); + actual.clear(); + } + + void onNotification( N data ) { + + try { + if( sleepTime > 0 ) { + Uninterruptibles.sleepUninterruptibly( sleepTime, TimeUnit.MILLISECONDS ); + } + + if( cacheNotifications ) { + actual.add( data ); + } + + RuntimeException localRuntimeEx = runtimeEx; + if( localRuntimeEx != null ) { + runtimeEx = null; + throw localRuntimeEx; + } + + Error localJvmError = jvmError; + if( localJvmError != null ) { + jvmError = null; + throw localJvmError; + } + + } finally { + latch.countDown(); + } + } + + void verifyNotifications() { + boolean done = Uninterruptibles.awaitUninterruptibly( latch, 10, TimeUnit.SECONDS ); + if( !done ) { + long actualCount = latch.getCount(); + fail( name + ": Received " + (expCount - actualCount) + + " notifications. Expected " + expCount ); + } + } + + void verifyNotifications( List expected ) { + verifyNotifications(); + assertEquals( name + ": Notifications", Lists.newArrayList( expected ), actual ); + } + + // Implement bad hashCode/equals methods to verify it doesn't screw up the + // QueuedNotificationManager as it should use reference identity. + @Override + public int hashCode(){ + return 1; + } + + @Override + public boolean equals( Object obj ){ + TestListener other = (TestListener) obj; + return other != null; + } + } + + static class TestListener2 extends TestListener { + TestListener2( int expCount, int id ) { + super(expCount, id); + } + } + + static class TestListener3 extends TestListener { + TestListener3( int expCount, int id ) { + super(expCount, id); + } + } + + static class TestNotifier implements QueuedNotificationManager.Invoker,N> { + + @Override + public void invokeListener( TestListener listener, N notification ) { + listener.onNotification( notification ); + } + } + + private ExecutorService queueExecutor; + + @After + public void tearDown() { + if( queueExecutor != null ) { + queueExecutor.shutdownNow(); + } + } + + @Test(timeout=10000) + public void testNotificationsWithSingleListener() { + + queueExecutor = Executors.newFixedThreadPool( 2 ); + NotificationManager, Integer> manager = + new QueuedNotificationManager<>( queueExecutor, new TestNotifier(), + 10, "TestMgr" ); + + int initialCount = 6; + int nNotifications = 100; + + TestListener listener = new TestListener<>( nNotifications, 1 ); + listener.sleepTime = 20; + + manager.submitNotifications( listener, Arrays.asList( 1, 2 ) ); + manager.submitNotification( listener, 3 ); + manager.submitNotifications( listener, Arrays.asList( 4, 5 ) ); + manager.submitNotification( listener, 6 ); + + manager.submitNotifications( null, Collections.emptyList() ); + manager.submitNotifications( listener, null ); + manager.submitNotification( listener, null ); + + Uninterruptibles.sleepUninterruptibly( 100, TimeUnit.MILLISECONDS ); + + listener.sleepTime = 0; + + List expNotifications = Lists.newArrayListWithCapacity( nNotifications ); + expNotifications.addAll( Arrays.asList( 1, 2, 3, 4, 5, 6 ) ); + for( int i = 1; i <= nNotifications - initialCount; i++ ) { + Integer v = Integer.valueOf( initialCount + i ); + expNotifications.add( v ); + manager.submitNotification( listener, v ); + } + + listener.verifyNotifications( expNotifications ); + } + + @Test + public void testNotificationsWithMultipleListeners() { + + int nListeners = 10; + queueExecutor = Executors.newFixedThreadPool( nListeners ); + final ExecutorService stagingExecutor = Executors.newFixedThreadPool( nListeners ); + final NotificationManager, Integer> manager = + new QueuedNotificationManager<>( queueExecutor, new TestNotifier(), + 5000, "TestMgr" ); + + final int nNotifications = 100000; + + System.out.println( "Testing " + nListeners + " listeners with " + nNotifications + + " notifications each..." ); + + final Integer[] notifications = new Integer[nNotifications]; + for( int i = 1; i <= nNotifications; i++ ) { + notifications[i-1] = Integer.valueOf( i ); + } + + Stopwatch stopWatch = new Stopwatch(); + stopWatch.start(); + + List> listeners = Lists.newArrayList(); + for( int i = 1; i <= nListeners; i++ ) { + final TestListener listener = + i == 2 ? new TestListener2( nNotifications, i ) : + i == 3 ? new TestListener3( nNotifications, i ) : + new TestListener( nNotifications, i ); + listeners.add( listener ); + + new Thread( new Runnable() { + @Override + public void run() { + for( int j = 1; j <= nNotifications; j++ ) { + final Integer n = notifications[j-1]; + stagingExecutor.execute( new Runnable() { + @Override + public void run() { + manager.submitNotification( listener, n ); + } + } ); + } + } + } ).start(); + } + + try { + for( TestListener listener: listeners ) { + listener.verifyNotifications(); + System.out.println( listener.name + " succeeded" ); + } + } finally { + stagingExecutor.shutdownNow(); + } + + stopWatch.stop(); + + System.out.println( "Elapsed time: " + stopWatch ); + System.out.println( queueExecutor ); + } + + @Test(timeout=10000) + public void testNotificationsWithListenerRuntimeEx() { + + queueExecutor = Executors.newFixedThreadPool( 1 ); + NotificationManager, Integer> manager = + new QueuedNotificationManager<>( queueExecutor, new TestNotifier(), + 10, "TestMgr" ); + + + TestListener listener = new TestListener<>( 2, 1 ); + listener.runtimeEx = new RuntimeException( "mock" ); + + manager.submitNotification( listener, 1 ); + manager.submitNotification( listener, 2 ); + + listener.verifyNotifications(); + } + + @Test(timeout=10000) + public void testNotificationsWithListenerJVMError() { + + final CountDownLatch errorCaughtLatch = new CountDownLatch( 1 ); + queueExecutor = new ThreadPoolExecutor( 1, 1, 0, TimeUnit.SECONDS, + new LinkedBlockingQueue() ) { + @Override + public void execute( final Runnable command ) { + super.execute( new Runnable() { + @Override + public void run() { + try { + command.run(); + } catch( Error e ) { + errorCaughtLatch.countDown(); + } + } + }); + } + }; + + NotificationManager, Integer> manager = + new QueuedNotificationManager<>( queueExecutor, new TestNotifier(), + 10, "TestMgr" ); + + TestListener listener = new TestListener<>( 2, 1 ); + listener.jvmError = new Error( "mock" ); + + manager.submitNotification( listener, 1 ); + + assertEquals( "JVM Error caught", true, Uninterruptibles.awaitUninterruptibly( + errorCaughtLatch, 5, TimeUnit.SECONDS ) ); + + manager.submitNotification( listener, 2 ); + + listener.verifyNotifications(); + } +} diff --git a/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/ThreadPoolExecutorTest.java b/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/ThreadPoolExecutorTest.java new file mode 100644 index 0000000000..8270e45d35 --- /dev/null +++ b/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/ThreadPoolExecutorTest.java @@ -0,0 +1,205 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.yangtools.util.concurrent; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.After; +import org.junit.Test; + +import com.google.common.base.Stopwatch; + +/** + * Tests various ThreadPoolExecutor implementations. + * + * @author Thomas Pantelis + */ +public class ThreadPoolExecutorTest { + + private ExecutorService executor; + + @After + public void tearDown() { + if( executor != null ) { + executor.shutdownNow(); + } + } + + @Test + public void testFastThreadPoolExecution() throws Exception { + + testThreadPoolExecution( + SpecialExecutors.newBoundedFastThreadPool( 50, 100000, "TestPool" ), + 100000, "TestPool", 0 ); + } + + @Test(expected=RejectedExecutionException.class) + public void testFastThreadPoolRejectingTask() throws Exception { + + executor = SpecialExecutors.newBoundedFastThreadPool( 1, 1, "TestPool" ); + + for( int i = 0; i < 5; i++ ) { + executor.execute( new Task( null, null, null, null, + TimeUnit.MICROSECONDS.convert( 5, TimeUnit.SECONDS ) ) ); + } + } + + @Test + public void testBlockingFastThreadPoolExecution() throws Exception { + + // With a queue capacity of 1, it should block at some point. + testThreadPoolExecution( + SpecialExecutors.newBlockingBoundedFastThreadPool( 2, 1, "TestPool" ), + 1000, null, 10 ); + } + + @Test + public void testCachedThreadPoolExecution() throws Exception { + + testThreadPoolExecution( + SpecialExecutors.newBoundedCachedThreadPool( 10, 100000, "TestPool" ), + 100000, "TestPool", 0 ); + } + + @Test(expected=RejectedExecutionException.class) + public void testCachedThreadRejectingTask() throws Exception { + + ExecutorService executor = SpecialExecutors.newBoundedCachedThreadPool( 1, 1, "TestPool" ); + + for( int i = 0; i < 5; i++ ) { + executor.execute( new Task( null, null, null, null, + TimeUnit.MICROSECONDS.convert( 5, TimeUnit.SECONDS ) ) ); + } + } + + @Test + public void testBlockingCachedThreadPoolExecution() throws Exception { + + testThreadPoolExecution( + SpecialExecutors.newBlockingBoundedCachedThreadPool( 2, 1, "TestPool" ), + 1000, null, 10 ); + } + + void testThreadPoolExecution( final ExecutorService executor, + final int numTasksToRun, final String expThreadPrefix, final long taskDelay ) throws Exception { + + this.executor = executor; + + System.out.println( "\nTesting " + executor.getClass().getSimpleName() + " with " + + numTasksToRun + " tasks." ); + + final CountDownLatch tasksRunLatch = new CountDownLatch( numTasksToRun ); + final ConcurrentMap taskCountPerThread = new ConcurrentHashMap<>(); + final AtomicReference threadError = new AtomicReference<>(); + + Stopwatch stopWatch = new Stopwatch(); + stopWatch.start(); + + new Thread() { + @Override + public void run() { + for( int i = 0; i < numTasksToRun; i++ ) { +// if(i%100 == 0) { +// Uninterruptibles.sleepUninterruptibly( 20, TimeUnit.MICROSECONDS ); +// } + + executor.execute( new Task( tasksRunLatch, taskCountPerThread, + threadError, expThreadPrefix, taskDelay ) ); + } + } + }.start(); + + boolean done = tasksRunLatch.await( 15, TimeUnit.SECONDS ); + + stopWatch.stop(); + + if( !done ) { + fail( (numTasksToRun - tasksRunLatch.getCount()) + " tasks out of " + + numTasksToRun + " executed" ); + } + + if( threadError.get() != null ) { + throw threadError.get(); + } + + System.out.println( taskCountPerThread.size() + " threads used:" ); + for( Map.Entry e : taskCountPerThread.entrySet() ) { + System.out.println( " " + e.getKey().getName() + " - " + e.getValue() + " tasks" ); + } + + System.out.println( "\n" + executor ); + System.out.println( "\nElapsed time: " + stopWatch ); + System.out.println(); + } + + private static class Task implements Runnable { + final CountDownLatch tasksRunLatch; + final ConcurrentMap taskCountPerThread; + final AtomicReference threadError; + final String expThreadPrefix; + final long delay; + + Task( CountDownLatch tasksRunLatch, ConcurrentMap taskCountPerThread, + AtomicReference threadError, String expThreadPrefix, long delay ) { + this.tasksRunLatch = tasksRunLatch; + this.taskCountPerThread = taskCountPerThread; + this.threadError = threadError; + this.expThreadPrefix = expThreadPrefix; + this.delay = delay; + } + + @Override + public void run() { + try { + if( delay > 0 ) { + try { + TimeUnit.MICROSECONDS.sleep( delay ); + } catch( InterruptedException e ) {} + } + + if( expThreadPrefix != null ) { + assertEquals( "Thread name starts with " + expThreadPrefix, true, + Thread.currentThread().getName().startsWith( expThreadPrefix ) ); + } + + if( taskCountPerThread != null ) { + AtomicLong count = taskCountPerThread.get( Thread.currentThread() ); + if( count == null ) { + count = new AtomicLong( 0 ); + AtomicLong prev = taskCountPerThread.putIfAbsent( Thread.currentThread(), count ); + if( prev != null ) { + count = prev; + } + } + + count.incrementAndGet(); + } + + } catch( AssertionError e ) { + if( threadError != null ) { + threadError.set( e ); + } + } finally { + if( tasksRunLatch != null ) { + tasksRunLatch.countDown(); + } + } + } + } +} -- 2.36.6