<extensions>true</extensions>
<configuration>
<instructions>
- <Export-Package>
- org.opendaylight.yangtools.util,
- org.opendaylight.yangtools.util.concurrent
- </Export-Package>
<Embed-Dependency>java-concurrent-hash-trie-map;inline=true</Embed-Dependency>
</instructions>
</configuration>
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.yangtools.util;
+
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.util.concurrent.AtomicDouble;
+
+/**
+ * Class that calculates and tracks time duration statistics.
+ *
+ * @author Thomas Pantelis
+ */
+public class DurationStatsTracker {
+
+ private final AtomicLong totalDurations = new AtomicLong();
+ private final AtomicLong longestDuration = new AtomicLong();
+ private volatile long timeOfLongestDuration;
+ private final AtomicLong shortestDuration = new AtomicLong(Long.MAX_VALUE);
+ private volatile long timeOfShortestDuration;
+ private final AtomicDouble averageDuration = new AtomicDouble();
+
+ /**
+ * Add a duration to track.
+ *
+ * @param duration the duration in nanoseconds.
+ */
+ public void addDuration(long duration) {
+
+ double currentAve = averageDuration.get();
+ long currentTotal = totalDurations.get();
+
+ long newTotal = currentTotal + 1;
+
+ // Calculate moving cumulative average.
+ double newAve = currentAve * currentTotal / newTotal + duration / newTotal;
+
+ averageDuration.compareAndSet(currentAve, newAve);
+ totalDurations.compareAndSet(currentTotal, newTotal);
+
+ long longest = longestDuration.get();
+ if( duration > longest ) {
+ if(longestDuration.compareAndSet( longest, duration )) {
+ timeOfLongestDuration = System.currentTimeMillis();
+ }
+ }
+
+ long shortest = shortestDuration.get();
+ if( duration < shortest ) {
+ if(shortestDuration.compareAndSet( shortest, duration )) {
+ timeOfShortestDuration = System.currentTimeMillis();
+ }
+ }
+ }
+
+ /**
+ * Returns the total number of tracked durations.
+ */
+ public long getTotalDurations() {
+ return totalDurations.get();
+ }
+
+ /**
+ * Returns the longest duration in nanoseconds.
+ */
+ public long getLongestDuration() {
+ return longestDuration.get();
+ }
+
+ /**
+ * Returns the shortest duration in nanoseconds.
+ */
+ public long getShortestDuration() {
+ long shortest = shortestDuration.get();
+ return shortest < Long.MAX_VALUE ? shortest : 0;
+ }
+
+ /**
+ * Returns the average duration in nanoseconds.
+ */
+ public double getAverageDuration() {
+ return averageDuration.get();
+ }
+
+ /**
+ * Returns the time stamp of the longest duration.
+ */
+ public long getTimeOfLongestDuration() {
+ return timeOfLongestDuration;
+ }
+
+ /**
+ * Returns the time stamp of the shortest duration.
+ */
+ public long getTimeOfShortestDuration() {
+ return timeOfShortestDuration;
+ }
+
+ /**
+ * Resets all statistics back to their defaults.
+ */
+ public void reset() {
+ totalDurations.set(0);
+ longestDuration.set(0);
+ timeOfLongestDuration = 0;
+ shortestDuration.set(Long.MAX_VALUE);
+ timeOfShortestDuration = 0;
+ averageDuration.set(0.0);
+ }
+
+ /**
+ * Returns the average duration as a displayable String with units, e.g. "12.34 ms".
+ */
+ public String getDisplayableAverageDuration() {
+ return formatDuration(getAverageDuration(), 0);
+ }
+
+ /**
+ * Returns the shortest duration as a displayable String with units and the date/time at
+ * which it occurred, e.g. "12.34 ms at 08/02/2014 12:30:24".
+ */
+ public String getDisplayableShortestDuration() {
+ return formatDuration(getShortestDuration(), getTimeOfShortestDuration());
+ }
+
+ /**
+ * Returns the longest duration as a displayable String with units and the date/time at
+ * which it occurred, e.g. "12.34 ms at 08/02/2014 12:30:24".
+ */
+ public String getDisplayableLongestDuration() {
+ return formatDuration(getLongestDuration(), getTimeOfLongestDuration());
+ }
+
+ private String formatDuration(double duration, long timeStamp) {
+ TimeUnit unit = chooseUnit((long)duration);
+ double value = duration / NANOSECONDS.convert(1, unit);
+ return timeStamp > 0 ?
+ String.format("%.4g %s at %3$tD %3$tT", value, abbreviate(unit), new Date(timeStamp)) :
+ String.format("%.4g %s", value, abbreviate(unit));
+ }
+
+ private static TimeUnit chooseUnit(long nanos) {
+ if(SECONDS.convert(nanos, NANOSECONDS) > 0) {
+ return SECONDS;
+ }
+
+ if(MILLISECONDS.convert(nanos, NANOSECONDS) > 0) {
+ return MILLISECONDS;
+ }
+
+ if(MICROSECONDS.convert(nanos, NANOSECONDS) > 0) {
+ return MICROSECONDS;
+ }
+
+ return NANOSECONDS;
+ }
+
+ private static String abbreviate(TimeUnit unit) {
+ switch(unit) {
+ case NANOSECONDS:
+ return "ns";
+ case MICROSECONDS:
+ return "\u03bcs"; // μs
+ case MILLISECONDS:
+ return "ms";
+ case SECONDS:
+ return "s";
+ default:
+ return "";
+ }
+ }
+}
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
import com.google.common.base.Objects;
import com.google.common.base.Objects.ToStringHelper;
import com.google.common.base.Preconditions;
private static final long IDLE_TIMEOUT_IN_SEC = 60L;
- private final AtomicLong largestBackingQueueSize = new AtomicLong( 0 );
-
private final ExecutorQueue executorQueue;
private final String threadPrefix;
executorQueue = (ExecutorQueue)super.getQueue();
rejectedTaskHandler = new RejectedTaskHandler(
- executorQueue.getBackingQueue(), largestBackingQueueSize );
+ executorQueue.getBackingQueue(), CountingRejectedExecutionHandler.newAbortPolicy() );
super.setRejectedExecutionHandler( rejectedTaskHandler );
}
@Override
public void setRejectedExecutionHandler( RejectedExecutionHandler handler ) {
+ Preconditions.checkNotNull( handler );
rejectedTaskHandler.setDelegateRejectedExecutionHandler( handler );
}
+ @Override
+ public RejectedExecutionHandler getRejectedExecutionHandler(){
+ return rejectedTaskHandler.getDelegateRejectedExecutionHandler();
+ }
+
@Override
public BlockingQueue<Runnable> getQueue(){
return executorQueue.getBackingQueue();
}
public long getLargestQueueSize() {
- return largestBackingQueueSize.get();
+ return ((TrackingLinkedBlockingQueue<?>)executorQueue.getBackingQueue()).getLargestQueueSize();
}
protected ToStringHelper addToStringAttributes( ToStringHelper toStringHelper ) {
private final LinkedBlockingQueue<Runnable> backingQueue;
ExecutorQueue( int maxBackingQueueSize ) {
- backingQueue = new LinkedBlockingQueue<>( maxBackingQueueSize );
+ backingQueue = new TrackingLinkedBlockingQueue<>( maxBackingQueueSize );
}
LinkedBlockingQueue<Runnable> getBackingQueue() {
private static class RejectedTaskHandler implements RejectedExecutionHandler {
private final LinkedBlockingQueue<Runnable> backingQueue;
- private final AtomicLong largestBackingQueueSize;
private volatile RejectedExecutionHandler delegateRejectedExecutionHandler;
RejectedTaskHandler( LinkedBlockingQueue<Runnable> backingQueue,
- AtomicLong largestBackingQueueSize ) {
+ RejectedExecutionHandler delegateRejectedExecutionHandler ) {
this.backingQueue = backingQueue;
- this.largestBackingQueueSize = largestBackingQueueSize;
+ this.delegateRejectedExecutionHandler = delegateRejectedExecutionHandler;
}
void setDelegateRejectedExecutionHandler(
- RejectedExecutionHandler delegateRejectedExecutionHandler ){
+ RejectedExecutionHandler delegateRejectedExecutionHandler ) {
this.delegateRejectedExecutionHandler = delegateRejectedExecutionHandler;
}
+ RejectedExecutionHandler getDelegateRejectedExecutionHandler(){
+ return delegateRejectedExecutionHandler;
+ }
+
@Override
public void rejectedExecution( Runnable task, ThreadPoolExecutor executor ) {
if( executor.isShutdown() ) {
}
if( !backingQueue.offer( task ) ) {
- if( delegateRejectedExecutionHandler != null ) {
- delegateRejectedExecutionHandler.rejectedExecution( task, executor );
- } else {
- throw new RejectedExecutionException(
- "All threads are in use and the queue is full" );
- }
- }
-
- largestBackingQueueSize.incrementAndGet();
- long size = backingQueue.size();
- long largest = largestBackingQueueSize.get();
- if( size > largest ) {
- largestBackingQueueSize.compareAndSet( largest, size );
+ delegateRejectedExecutionHandler.rejectedExecution( task, executor );
}
}
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.yangtools.util.concurrent;
+
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.opendaylight.yangtools.util.ExecutorServiceUtil;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A RejectedExecutionHandler that delegates to a backing RejectedExecutionHandler and counts the
+ * number of rejected tasks.
+ *
+ * @author Thomas Pantelis
+ */
+public class CountingRejectedExecutionHandler implements RejectedExecutionHandler {
+
+ private final RejectedExecutionHandler delegate;
+ private final AtomicLong rejectedTaskCounter = new AtomicLong();
+
+ /**
+ * Constructor.
+ *
+ * @param delegate the backing RejectedExecutionHandler.
+ */
+ public CountingRejectedExecutionHandler( RejectedExecutionHandler delegate ) {
+ this.delegate = Preconditions.checkNotNull( delegate );
+ }
+
+ @Override
+ public void rejectedExecution( Runnable task, ThreadPoolExecutor executor ) {
+ rejectedTaskCounter.incrementAndGet();
+ delegate.rejectedExecution( task, executor );
+ }
+
+ /**
+ * Returns the rejected task count.
+ */
+ public long getRejectedTaskCount(){
+ return rejectedTaskCounter.get();
+ }
+
+ /**
+ * Returns s counting handler for rejected tasks that runs the rejected task directly in the
+ * calling thread of the execute method, unless the executor has been shut down, in which case
+ * the task is discarded.
+ */
+ public static CountingRejectedExecutionHandler newCallerRunsPolicy() {
+ return new CountingRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy() );
+ }
+
+ /**
+ * Returns a counting handler for rejected tasks that throws a RejectedExecutionException.
+ */
+ public static CountingRejectedExecutionHandler newAbortPolicy() {
+ return new CountingRejectedExecutionHandler( new ThreadPoolExecutor.AbortPolicy() );
+ }
+
+ /**
+ * Returns a counting handler for rejected tasks that that blocks on the
+ * {@link ThreadPoolExecutor}'s backing queue until it can add the task to the queue.
+ */
+ public static CountingRejectedExecutionHandler newCallerWaitsPolicy() {
+ return new CountingRejectedExecutionHandler( ExecutorServiceUtil.waitInQueueExecutionHandler() );
+ }
+}
package org.opendaylight.yangtools.util.concurrent;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
// reached, subsequent tasks will be queued. If the queue is full, tasks will be rejected.
super( maximumPoolSize, maximumPoolSize, keepAliveTime, unit,
- new LinkedBlockingQueue<Runnable>( maximumQueueSize ) );
+ new TrackingLinkedBlockingQueue<Runnable>( maximumQueueSize ) );
this.threadPrefix = threadPrefix;
this.maximumQueueSize = maximumQueueSize;
// Need to specifically configure core threads to timeout.
allowCoreThreadTimeOut( true );
}
+
+ setRejectedExecutionHandler( CountingRejectedExecutionHandler.newAbortPolicy() );
+ }
+
+ public long getLargestQueueSize() {
+ return ((TrackingLinkedBlockingQueue<?>)getQueue()).getLargestQueueSize();
}
protected ToStringHelper addToStringAttributes( ToStringHelper toStringHelper ) {
.add( "Largest Thread Pool Size", getLargestPoolSize() )
.add( "Max Thread Pool Size", getMaximumPoolSize() )
.add( "Current Queue Size", getQueue().size() )
+ .add( "Largest Queue Size", getLargestQueueSize() )
.add( "Max Queue Size", maximumQueueSize )
.add( "Active Thread Count", getActiveCount() )
.add( "Completed Task Count", getCompletedTaskCount() )
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.yangtools.util.concurrent;
+
+import java.beans.ConstructorProperties;
+
+/**
+ * Class used by the {@link QueuedNotificationManager} that contains a snapshot of notification
+ * queue statistics for a listener.
+ *
+ * @author Thomas Pantelis
+ * @see QueuedNotificationManager
+ */
+public class ListenerNotificationQueueStats {
+
+ private final String listenerClassName;
+ private final int currentQueueSize;
+
+ @ConstructorProperties({"listenerClassName","currentQueueSize"})
+ public ListenerNotificationQueueStats( String listenerClassName, int currentQueueSize ) {
+ this.listenerClassName = listenerClassName;
+ this.currentQueueSize = currentQueueSize;
+ }
+
+ /**
+ * Returns the name of the listener class.
+ */
+ public String getListenerClassName(){
+ return listenerClassName;
+ }
+
+ /**
+ * Returns the current notification queue size.
+ */
+ public int getCurrentQueueSize(){
+ return currentQueueSize;
+ }
+}
package org.opendaylight.yangtools.util.concurrent;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
}
}
+ /**
+ * Returns {@link ListenerNotificationQueueStats} instances for each current listener
+ * notification task in progress.
+ */
+ public List<ListenerNotificationQueueStats> getListenerNotificationQueueStats() {
+ List<ListenerNotificationQueueStats> statsList = new ArrayList<>( listenerCache.size() );
+ for( NotificationTask task: listenerCache.values() ) {
+ statsList.add( new ListenerNotificationQueueStats(
+ task.listenerKey.getListener().getClass().getName(),
+ task.notificationQueue.size() ) );
+ }
+
+ return statsList ;
+ }
+
+ /**
+ * Returns the maximum listener queue capacity.
+ */
+ public int getMaxQueueCapacity(){
+ return maxQueueCapacity;
+ }
+
+ /**
+ * Returns the {@link Executor} to used for notification tasks.
+ */
+ public Executor getExecutor(){
+ return executor;
+ }
+
/**
* Used as the listenerCache map key. We key by listener reference identity hashCode/equals.
* Since we don't know anything about the listener class implementations and we're mixing
package org.opendaylight.yangtools.util.concurrent;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
FastThreadPoolExecutor executor =
new FastThreadPoolExecutor( maximumPoolSize, maximumQueueSize, threadPrefix );
- executor.setRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy() );
+ executor.setRejectedExecutionHandler( CountingRejectedExecutionHandler.newCallerRunsPolicy() );
return executor;
}
CachedThreadPoolExecutor executor =
new CachedThreadPoolExecutor( maximumPoolSize, maximumQueueSize, threadPrefix );
- executor.setRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy() );
+ executor.setRejectedExecutionHandler( CountingRejectedExecutionHandler.newCallerRunsPolicy() );
return executor;
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.yangtools.util.concurrent;
+
+import java.util.Collection;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+
+/**
+ * A {@link LinkedBlockingQueue} that tracks the largest queue size for debugging.
+ *
+ * @author Thomas Pantelis
+ *
+ * @param <E> the element t.ype
+ */
+public class TrackingLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Holds largestQueueSize, this long field should be only accessed
+ * using {@value #LARGEST_QUEUE_SIZE_UPDATER}
+ */
+ private volatile long largestQueueSize = 0;
+
+ @SuppressWarnings("rawtypes")
+ private static AtomicLongFieldUpdater<TrackingLinkedBlockingQueue> LARGEST_QUEUE_SIZE_UPDATER = AtomicLongFieldUpdater.newUpdater(TrackingLinkedBlockingQueue.class, "largestQueueSize");
+
+ /**
+ * @see LinkedBlockingQueue#LinkedBlockingQueue
+ */
+ public TrackingLinkedBlockingQueue() {
+ super();
+ }
+
+ /**
+ * @see LinkedBlockingQueue#LinkedBlockingQueue(Collection)
+ */
+ public TrackingLinkedBlockingQueue( Collection<? extends E> c ) {
+ super(c);
+ }
+
+ /**
+ * @see LinkedBlockingQueue#LinkedBlockingQueue(int)
+ */
+ public TrackingLinkedBlockingQueue( int capacity ) {
+ super(capacity);
+ }
+
+ /**
+ * Returns the largest queue size.
+ */
+ public long getLargestQueueSize(){
+ return largestQueueSize;
+ }
+
+ @Override
+ public boolean offer( E e, long timeout, TimeUnit unit ) throws InterruptedException {
+ if( super.offer( e, timeout, unit ) ) {
+ updateLargestQueueSize();
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public boolean offer( E e ) {
+ if( super.offer( e ) ) {
+ updateLargestQueueSize();
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public void put( E e ) throws InterruptedException {
+ super.put( e );
+ updateLargestQueueSize();
+ }
+
+ @Override
+ public boolean add( E e ) {
+ boolean result = super.add( e );
+ updateLargestQueueSize();
+ return result;
+ }
+
+ @Override
+ public boolean addAll( Collection<? extends E> c ) {
+ try {
+ return super.addAll( c );
+ } finally {
+ updateLargestQueueSize();
+ }
+ }
+
+ private void updateLargestQueueSize() {
+ long size = size();
+ long largest = largestQueueSize;
+ if( size > largest ) {
+ LARGEST_QUEUE_SIZE_UPDATER.compareAndSet(this, largest, size );
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.yangtools.util;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+/**
+ * Unit tests for DurationStatsTracker.
+ *
+ * @author Thomas Pantelis
+ */
+public class DurationStatsTrackerTest {
+
+ @Test
+ public void test() {
+
+ DurationStatsTracker tracker = new DurationStatsTracker();
+
+ tracker.addDuration(10000);
+ assertEquals("getTotalDurations", 1, tracker.getTotalDurations());
+ assertEquals("getAverageDuration", 10000.0, tracker.getAverageDuration(), 0.1);
+ assertEquals("getLongestDuration", 10000, tracker.getLongestDuration());
+ assertEquals("getShortestDuration", 10000, tracker.getShortestDuration());
+
+ tracker.addDuration(30000);
+ assertEquals("getTotalDurations", 2, tracker.getTotalDurations());
+ assertEquals("getAverageDuration", 20000.0, tracker.getAverageDuration(), 0.1);
+ assertEquals("getLongestDuration", 30000, tracker.getLongestDuration());
+ assertEquals("getShortestDuration", 10000, tracker.getShortestDuration());
+
+ verifyDisplayableString("getDisplayableAverageDuration",
+ tracker.getDisplayableAverageDuration(), "20.0");
+ verifyDisplayableString("getDisplayableLongestDuration",
+ tracker.getDisplayableLongestDuration(), "30.0");
+ verifyDisplayableString("getDisplayableShortestDuration",
+ tracker.getDisplayableShortestDuration(), "10.0");
+
+ tracker.addDuration(10000);
+ assertEquals("getTotalDurations", 3, tracker.getTotalDurations());
+ assertEquals("getAverageDuration", 16666.0, tracker.getAverageDuration(), 1.0);
+ assertEquals("getLongestDuration", 30000, tracker.getLongestDuration());
+ assertEquals("getShortestDuration", 10000, tracker.getShortestDuration());
+
+ tracker.addDuration(5000);
+ assertEquals("getTotalDurations", 4, tracker.getTotalDurations());
+ assertEquals("getAverageDuration", 13750.0, tracker.getAverageDuration(), 1.0);
+ assertEquals("getLongestDuration", 30000, tracker.getLongestDuration());
+ assertEquals("getShortestDuration", 5000, tracker.getShortestDuration());
+
+ tracker.reset();
+ assertEquals("getTotalDurations", 0, tracker.getTotalDurations());
+ assertEquals("getAverageDuration", 0.0, tracker.getAverageDuration(), 0.1);
+ assertEquals("getLongestDuration", 0, tracker.getLongestDuration());
+ assertEquals("getShortestDuration", 0, tracker.getShortestDuration());
+
+ tracker.addDuration(10000);
+ assertEquals("getTotalDurations", 1, tracker.getTotalDurations());
+ assertEquals("getAverageDuration", 10000.0, tracker.getAverageDuration(), 0.1);
+ assertEquals("getLongestDuration", 10000, tracker.getLongestDuration());
+ assertEquals("getShortestDuration", 10000, tracker.getShortestDuration());
+ }
+
+ private void verifyDisplayableString(String name, String actual, String expPrefix) {
+ assertEquals(name + " starts with " + expPrefix + ". Actual: " + actual,
+ true, actual.startsWith(expPrefix));
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.yangtools.util.concurrent;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Test;
+import org.opendaylight.yangtools.util.ExecutorServiceUtil;
+import org.opendaylight.yangtools.util.concurrent.ThreadPoolExecutorTest.Task;
+
+/**
+ * Unit tests for CountingRejectedExecutionHandler.
+ *
+ * @author Thomas Pantelis
+ */
+public class CountingRejectedExecutionHandlerTest {
+
+ private ThreadPoolExecutor executor;
+
+ @After
+ public void tearDown() {
+ if( executor != null ) {
+ executor.shutdownNow();
+ }
+ }
+
+ @Test
+ public void testCallerRunsPolicyHandler() throws InterruptedException {
+
+ int nTasks = 5;
+ CountDownLatch tasksRunLatch = new CountDownLatch( 1 );
+ CountDownLatch blockLatch = new CountDownLatch( 1 );
+
+ executor = new ThreadPoolExecutor( 1, 1, 0, TimeUnit.SECONDS,
+ ExecutorServiceUtil.offerFailingBlockingQueue( new LinkedBlockingQueue<Runnable>() ) );
+
+ CountingRejectedExecutionHandler countingHandler =
+ CountingRejectedExecutionHandler.newCallerRunsPolicy();
+ executor.setRejectedExecutionHandler( countingHandler );
+
+ executor.execute( new Task( tasksRunLatch, blockLatch ) );
+
+ for( int i = 0; i < nTasks - 1; i++ ) {
+ executor.execute( new Task( null, null, null, null, 0 ) );
+ }
+
+ assertEquals( "getRejectedTaskCount", nTasks - 1, countingHandler.getRejectedTaskCount() );
+
+ blockLatch.countDown();
+
+ assertEquals( "Tasks complete", true, tasksRunLatch.await( 5, TimeUnit.SECONDS ) );
+ }
+
+ @Test
+ public void testAbortPolicyHandler() throws InterruptedException {
+
+ int nTasks = 5;
+ CountDownLatch tasksRunLatch = new CountDownLatch( 1 );
+ CountDownLatch blockLatch = new CountDownLatch( 1 );
+
+ executor = new ThreadPoolExecutor( 1, 1, 0, TimeUnit.SECONDS,
+ ExecutorServiceUtil.offerFailingBlockingQueue( new LinkedBlockingQueue<Runnable>() ) );
+
+ CountingRejectedExecutionHandler countingHandler =
+ CountingRejectedExecutionHandler.newAbortPolicy();
+ executor.setRejectedExecutionHandler( countingHandler );
+
+ executor.execute( new Task( tasksRunLatch, blockLatch ) );
+
+ for( int i = 0; i < nTasks - 1; i++ ) {
+ try {
+ executor.execute( new Task( null, null, null, null, 0 ) );
+ fail( "Expected RejectedExecutionException" );
+ } catch( RejectedExecutionException e ) {
+ // Expected
+ }
+ }
+
+ assertEquals( "getRejectedTaskCount", nTasks - 1, countingHandler.getRejectedTaskCount() );
+
+ blockLatch.countDown();
+
+ assertEquals( "Tasks complete", true, tasksRunLatch.await( 5, TimeUnit.SECONDS ) );
+ }
+}
System.out.println();
}
- private static class Task implements Runnable {
+ static class Task implements Runnable {
final CountDownLatch tasksRunLatch;
+ final CountDownLatch blockLatch;
final ConcurrentMap<Thread, AtomicLong> taskCountPerThread;
final AtomicReference<AssertionError> threadError;
final String expThreadPrefix;
this.threadError = threadError;
this.expThreadPrefix = expThreadPrefix;
this.delay = delay;
+ blockLatch = null;
+ }
+
+ Task( CountDownLatch tasksRunLatch, CountDownLatch blockLatch ) {
+ this.tasksRunLatch = tasksRunLatch;
+ this.blockLatch = blockLatch;
+ this.taskCountPerThread = null;
+ this.threadError = null;
+ this.expThreadPrefix = null;
+ this.delay = 0;
}
@Override
public void run() {
try {
- if( delay > 0 ) {
- try {
+ try {
+ if( delay > 0 ) {
TimeUnit.MICROSECONDS.sleep( delay );
- } catch( InterruptedException e ) {}
- }
+ } else if( blockLatch != null ) {
+ blockLatch.await();
+ }
+ } catch( InterruptedException e ) {}
if( expThreadPrefix != null ) {
assertEquals( "Thread name starts with " + expThreadPrefix, true,
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.yangtools.util.concurrent;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+/**
+ * Unit tests for TrackingLinkedBlockingQueue.
+ *
+ * @author Thomas Pantelis
+ */
+public class TrackingLinkedBlockingQueueTest {
+
+ @Test
+ public void testOffer() throws InterruptedException {
+
+ TrackingLinkedBlockingQueue<String> queue = new TrackingLinkedBlockingQueue<>( 2 );
+
+ assertEquals( "offer", true, queue.offer( "1" ) );
+ assertEquals( "getLargestQueueSize", 1, queue.getLargestQueueSize() );
+ assertEquals( "size", 1, queue.size() );
+
+ assertEquals( "offer", true, queue.offer( "2", 1, TimeUnit.MILLISECONDS ) );
+ assertEquals( "getLargestQueueSize", 2, queue.getLargestQueueSize() );
+ assertEquals( "size", 2, queue.size() );
+
+ assertEquals( "offer", false, queue.offer( "3" ) );
+ assertEquals( "getLargestQueueSize", 2, queue.getLargestQueueSize() );
+ assertEquals( "size", 2, queue.size() );
+
+ assertEquals( "offer", false, queue.offer( "4", 1, TimeUnit.MILLISECONDS ) );
+ assertEquals( "getLargestQueueSize", 2, queue.getLargestQueueSize() );
+ assertEquals( "size", 2, queue.size() );
+ }
+
+ @Test
+ public void testPut() throws InterruptedException {
+
+ TrackingLinkedBlockingQueue<String> queue = new TrackingLinkedBlockingQueue<>();
+
+ queue.put( "1" );
+ assertEquals( "getLargestQueueSize", 1, queue.getLargestQueueSize() );
+ assertEquals( "size", 1, queue.size() );
+
+ queue.put( "2" );
+ assertEquals( "getLargestQueueSize", 2, queue.getLargestQueueSize() );
+ assertEquals( "size", 2, queue.size() );
+ }
+
+ @Test
+ public void testAdd() {
+
+ TrackingLinkedBlockingQueue<String> queue = new TrackingLinkedBlockingQueue<>( 2 );
+
+ assertEquals( "add", true, queue.add( "1" ) );
+ assertEquals( "getLargestQueueSize", 1, queue.getLargestQueueSize() );
+ assertEquals( "size", 1, queue.size() );
+
+ assertEquals( "add", true, queue.add( "2" ) );
+ assertEquals( "getLargestQueueSize", 2, queue.getLargestQueueSize() );
+ assertEquals( "size", 2, queue.size() );
+
+ try {
+ queue.add( "3" );
+ fail( "Expected IllegalStateException" );
+ } catch( IllegalStateException e ) {
+ // Expected
+ assertEquals( "getLargestQueueSize", 2, queue.getLargestQueueSize() );
+ assertEquals( "size", 2, queue.size() );
+ }
+ }
+
+ @Test
+ public void testAddAll() {
+
+ TrackingLinkedBlockingQueue<String> queue = new TrackingLinkedBlockingQueue<>( 3 );
+
+ queue.addAll( Arrays.asList( "1", "2" ) );
+ assertEquals( "getLargestQueueSize", 2, queue.getLargestQueueSize() );
+ assertEquals( "size", 2, queue.size() );
+
+ try {
+ queue.addAll( Arrays.asList( "3", "4" ) );
+ fail( "Expected IllegalStateException" );
+ } catch( IllegalStateException e ) {
+ // Expected
+ assertEquals( "getLargestQueueSize", 3, queue.getLargestQueueSize() );
+ assertEquals( "size", 3, queue.size() );
+ }
+ }
+}