From 3e0931740202d83c5f5fbec4172cda20baae2594 Mon Sep 17 00:00:00 2001 From: tpantelis Date: Thu, 7 Aug 2014 17:32:09 -0400 Subject: [PATCH] Bug 1446: Add new concurrent classes for tracking stats TrackingLinkedBlockingQueue A LinkedBlockingQueue that tracks the largest queue size foridebugging. CountingRejectedExecutionHandler A RejectedExecutionHandler that delegates to a backing RejectedExecutionHandler and counts the number of rejected tasks. DurationStatsTracker Class that calculates and tracks time duration statistics. Change-Id: I1b7bbb45301d8cf682a5de5d51cc64727e3139dc Signed-off-by: tpantelis --- common/util/pom.xml | 4 - .../yangtools/util/DurationStatsTracker.java | 185 ++++++++++++++++++ .../concurrent/CachedThreadPoolExecutor.java | 41 ++-- .../CountingRejectedExecutionHandler.java | 75 +++++++ .../concurrent/FastThreadPoolExecutor.java | 10 +- .../ListenerNotificationQueueStats.java | 44 +++++ .../concurrent/QueuedNotificationManager.java | 31 +++ .../util/concurrent/SpecialExecutors.java | 5 +- .../TrackingLinkedBlockingQueue.java | 114 +++++++++++ .../util/DurationStatsTrackerTest.java | 75 +++++++ .../CountingRejectedExecutionHandlerTest.java | 99 ++++++++++ .../concurrent/ThreadPoolExecutorTest.java | 23 ++- .../TrackingLinkedBlockingQueueTest.java | 103 ++++++++++ 13 files changed, 771 insertions(+), 38 deletions(-) create mode 100644 common/util/src/main/java/org/opendaylight/yangtools/util/DurationStatsTracker.java create mode 100644 common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/CountingRejectedExecutionHandler.java create mode 100644 common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/ListenerNotificationQueueStats.java create mode 100644 common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/TrackingLinkedBlockingQueue.java create mode 100644 common/util/src/test/java/org/opendaylight/yangtools/util/DurationStatsTrackerTest.java create mode 100644 common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/CountingRejectedExecutionHandlerTest.java create mode 100644 common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/TrackingLinkedBlockingQueueTest.java diff --git a/common/util/pom.xml b/common/util/pom.xml index 5d185efe51..f3502891ba 100644 --- a/common/util/pom.xml +++ b/common/util/pom.xml @@ -61,10 +61,6 @@ true - - org.opendaylight.yangtools.util, - org.opendaylight.yangtools.util.concurrent - java-concurrent-hash-trie-map;inline=true diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/DurationStatsTracker.java b/common/util/src/main/java/org/opendaylight/yangtools/util/DurationStatsTracker.java new file mode 100644 index 0000000000..45ebd01306 --- /dev/null +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/DurationStatsTracker.java @@ -0,0 +1,185 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.yangtools.util; + +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; + +import java.util.Date; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.util.concurrent.AtomicDouble; + +/** + * Class that calculates and tracks time duration statistics. + * + * @author Thomas Pantelis + */ +public class DurationStatsTracker { + + private final AtomicLong totalDurations = new AtomicLong(); + private final AtomicLong longestDuration = new AtomicLong(); + private volatile long timeOfLongestDuration; + private final AtomicLong shortestDuration = new AtomicLong(Long.MAX_VALUE); + private volatile long timeOfShortestDuration; + private final AtomicDouble averageDuration = new AtomicDouble(); + + /** + * Add a duration to track. + * + * @param duration the duration in nanoseconds. + */ + public void addDuration(long duration) { + + double currentAve = averageDuration.get(); + long currentTotal = totalDurations.get(); + + long newTotal = currentTotal + 1; + + // Calculate moving cumulative average. + double newAve = currentAve * currentTotal / newTotal + duration / newTotal; + + averageDuration.compareAndSet(currentAve, newAve); + totalDurations.compareAndSet(currentTotal, newTotal); + + long longest = longestDuration.get(); + if( duration > longest ) { + if(longestDuration.compareAndSet( longest, duration )) { + timeOfLongestDuration = System.currentTimeMillis(); + } + } + + long shortest = shortestDuration.get(); + if( duration < shortest ) { + if(shortestDuration.compareAndSet( shortest, duration )) { + timeOfShortestDuration = System.currentTimeMillis(); + } + } + } + + /** + * Returns the total number of tracked durations. + */ + public long getTotalDurations() { + return totalDurations.get(); + } + + /** + * Returns the longest duration in nanoseconds. + */ + public long getLongestDuration() { + return longestDuration.get(); + } + + /** + * Returns the shortest duration in nanoseconds. + */ + public long getShortestDuration() { + long shortest = shortestDuration.get(); + return shortest < Long.MAX_VALUE ? shortest : 0; + } + + /** + * Returns the average duration in nanoseconds. + */ + public double getAverageDuration() { + return averageDuration.get(); + } + + /** + * Returns the time stamp of the longest duration. + */ + public long getTimeOfLongestDuration() { + return timeOfLongestDuration; + } + + /** + * Returns the time stamp of the shortest duration. + */ + public long getTimeOfShortestDuration() { + return timeOfShortestDuration; + } + + /** + * Resets all statistics back to their defaults. + */ + public void reset() { + totalDurations.set(0); + longestDuration.set(0); + timeOfLongestDuration = 0; + shortestDuration.set(Long.MAX_VALUE); + timeOfShortestDuration = 0; + averageDuration.set(0.0); + } + + /** + * Returns the average duration as a displayable String with units, e.g. "12.34 ms". + */ + public String getDisplayableAverageDuration() { + return formatDuration(getAverageDuration(), 0); + } + + /** + * Returns the shortest duration as a displayable String with units and the date/time at + * which it occurred, e.g. "12.34 ms at 08/02/2014 12:30:24". + */ + public String getDisplayableShortestDuration() { + return formatDuration(getShortestDuration(), getTimeOfShortestDuration()); + } + + /** + * Returns the longest duration as a displayable String with units and the date/time at + * which it occurred, e.g. "12.34 ms at 08/02/2014 12:30:24". + */ + public String getDisplayableLongestDuration() { + return formatDuration(getLongestDuration(), getTimeOfLongestDuration()); + } + + private String formatDuration(double duration, long timeStamp) { + TimeUnit unit = chooseUnit((long)duration); + double value = duration / NANOSECONDS.convert(1, unit); + return timeStamp > 0 ? + String.format("%.4g %s at %3$tD %3$tT", value, abbreviate(unit), new Date(timeStamp)) : + String.format("%.4g %s", value, abbreviate(unit)); + } + + private static TimeUnit chooseUnit(long nanos) { + if(SECONDS.convert(nanos, NANOSECONDS) > 0) { + return SECONDS; + } + + if(MILLISECONDS.convert(nanos, NANOSECONDS) > 0) { + return MILLISECONDS; + } + + if(MICROSECONDS.convert(nanos, NANOSECONDS) > 0) { + return MICROSECONDS; + } + + return NANOSECONDS; + } + + private static String abbreviate(TimeUnit unit) { + switch(unit) { + case NANOSECONDS: + return "ns"; + case MICROSECONDS: + return "\u03bcs"; // μs + case MILLISECONDS: + return "ms"; + case SECONDS: + return "s"; + default: + return ""; + } + } +} diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/CachedThreadPoolExecutor.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/CachedThreadPoolExecutor.java index 4936efaba1..a7dd4af009 100644 --- a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/CachedThreadPoolExecutor.java +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/CachedThreadPoolExecutor.java @@ -14,8 +14,6 @@ import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - import com.google.common.base.Objects; import com.google.common.base.Objects.ToStringHelper; import com.google.common.base.Preconditions; @@ -33,8 +31,6 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor { private static final long IDLE_TIMEOUT_IN_SEC = 60L; - private final AtomicLong largestBackingQueueSize = new AtomicLong( 0 ); - private final ExecutorQueue executorQueue; private final String threadPrefix; @@ -76,22 +72,28 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor { executorQueue = (ExecutorQueue)super.getQueue(); rejectedTaskHandler = new RejectedTaskHandler( - executorQueue.getBackingQueue(), largestBackingQueueSize ); + executorQueue.getBackingQueue(), CountingRejectedExecutionHandler.newAbortPolicy() ); super.setRejectedExecutionHandler( rejectedTaskHandler ); } @Override public void setRejectedExecutionHandler( RejectedExecutionHandler handler ) { + Preconditions.checkNotNull( handler ); rejectedTaskHandler.setDelegateRejectedExecutionHandler( handler ); } + @Override + public RejectedExecutionHandler getRejectedExecutionHandler(){ + return rejectedTaskHandler.getDelegateRejectedExecutionHandler(); + } + @Override public BlockingQueue getQueue(){ return executorQueue.getBackingQueue(); } public long getLargestQueueSize() { - return largestBackingQueueSize.get(); + return ((TrackingLinkedBlockingQueue)executorQueue.getBackingQueue()).getLargestQueueSize(); } protected ToStringHelper addToStringAttributes( ToStringHelper toStringHelper ) { @@ -129,7 +131,7 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor { private final LinkedBlockingQueue backingQueue; ExecutorQueue( int maxBackingQueueSize ) { - backingQueue = new LinkedBlockingQueue<>( maxBackingQueueSize ); + backingQueue = new TrackingLinkedBlockingQueue<>( maxBackingQueueSize ); } LinkedBlockingQueue getBackingQueue() { @@ -189,20 +191,23 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor { private static class RejectedTaskHandler implements RejectedExecutionHandler { private final LinkedBlockingQueue backingQueue; - private final AtomicLong largestBackingQueueSize; private volatile RejectedExecutionHandler delegateRejectedExecutionHandler; RejectedTaskHandler( LinkedBlockingQueue backingQueue, - AtomicLong largestBackingQueueSize ) { + RejectedExecutionHandler delegateRejectedExecutionHandler ) { this.backingQueue = backingQueue; - this.largestBackingQueueSize = largestBackingQueueSize; + this.delegateRejectedExecutionHandler = delegateRejectedExecutionHandler; } void setDelegateRejectedExecutionHandler( - RejectedExecutionHandler delegateRejectedExecutionHandler ){ + RejectedExecutionHandler delegateRejectedExecutionHandler ) { this.delegateRejectedExecutionHandler = delegateRejectedExecutionHandler; } + RejectedExecutionHandler getDelegateRejectedExecutionHandler(){ + return delegateRejectedExecutionHandler; + } + @Override public void rejectedExecution( Runnable task, ThreadPoolExecutor executor ) { if( executor.isShutdown() ) { @@ -210,19 +215,7 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor { } if( !backingQueue.offer( task ) ) { - if( delegateRejectedExecutionHandler != null ) { - delegateRejectedExecutionHandler.rejectedExecution( task, executor ); - } else { - throw new RejectedExecutionException( - "All threads are in use and the queue is full" ); - } - } - - largestBackingQueueSize.incrementAndGet(); - long size = backingQueue.size(); - long largest = largestBackingQueueSize.get(); - if( size > largest ) { - largestBackingQueueSize.compareAndSet( largest, size ); + delegateRejectedExecutionHandler.rejectedExecution( task, executor ); } } } diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/CountingRejectedExecutionHandler.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/CountingRejectedExecutionHandler.java new file mode 100644 index 0000000000..ab010c964d --- /dev/null +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/CountingRejectedExecutionHandler.java @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.yangtools.util.concurrent; + +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicLong; + +import org.opendaylight.yangtools.util.ExecutorServiceUtil; + +import com.google.common.base.Preconditions; + +/** + * A RejectedExecutionHandler that delegates to a backing RejectedExecutionHandler and counts the + * number of rejected tasks. + * + * @author Thomas Pantelis + */ +public class CountingRejectedExecutionHandler implements RejectedExecutionHandler { + + private final RejectedExecutionHandler delegate; + private final AtomicLong rejectedTaskCounter = new AtomicLong(); + + /** + * Constructor. + * + * @param delegate the backing RejectedExecutionHandler. + */ + public CountingRejectedExecutionHandler( RejectedExecutionHandler delegate ) { + this.delegate = Preconditions.checkNotNull( delegate ); + } + + @Override + public void rejectedExecution( Runnable task, ThreadPoolExecutor executor ) { + rejectedTaskCounter.incrementAndGet(); + delegate.rejectedExecution( task, executor ); + } + + /** + * Returns the rejected task count. + */ + public long getRejectedTaskCount(){ + return rejectedTaskCounter.get(); + } + + /** + * Returns s counting handler for rejected tasks that runs the rejected task directly in the + * calling thread of the execute method, unless the executor has been shut down, in which case + * the task is discarded. + */ + public static CountingRejectedExecutionHandler newCallerRunsPolicy() { + return new CountingRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy() ); + } + + /** + * Returns a counting handler for rejected tasks that throws a RejectedExecutionException. + */ + public static CountingRejectedExecutionHandler newAbortPolicy() { + return new CountingRejectedExecutionHandler( new ThreadPoolExecutor.AbortPolicy() ); + } + + /** + * Returns a counting handler for rejected tasks that that blocks on the + * {@link ThreadPoolExecutor}'s backing queue until it can add the task to the queue. + */ + public static CountingRejectedExecutionHandler newCallerWaitsPolicy() { + return new CountingRejectedExecutionHandler( ExecutorServiceUtil.waitInQueueExecutionHandler() ); + } +} diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/FastThreadPoolExecutor.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/FastThreadPoolExecutor.java index b7549eb24e..bf92ca5d9b 100644 --- a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/FastThreadPoolExecutor.java +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/FastThreadPoolExecutor.java @@ -8,7 +8,6 @@ package org.opendaylight.yangtools.util.concurrent; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -70,7 +69,7 @@ public class FastThreadPoolExecutor extends ThreadPoolExecutor { // reached, subsequent tasks will be queued. If the queue is full, tasks will be rejected. super( maximumPoolSize, maximumPoolSize, keepAliveTime, unit, - new LinkedBlockingQueue( maximumQueueSize ) ); + new TrackingLinkedBlockingQueue( maximumQueueSize ) ); this.threadPrefix = threadPrefix; this.maximumQueueSize = maximumQueueSize; @@ -82,6 +81,12 @@ public class FastThreadPoolExecutor extends ThreadPoolExecutor { // Need to specifically configure core threads to timeout. allowCoreThreadTimeOut( true ); } + + setRejectedExecutionHandler( CountingRejectedExecutionHandler.newAbortPolicy() ); + } + + public long getLargestQueueSize() { + return ((TrackingLinkedBlockingQueue)getQueue()).getLargestQueueSize(); } protected ToStringHelper addToStringAttributes( ToStringHelper toStringHelper ) { @@ -96,6 +101,7 @@ public class FastThreadPoolExecutor extends ThreadPoolExecutor { .add( "Largest Thread Pool Size", getLargestPoolSize() ) .add( "Max Thread Pool Size", getMaximumPoolSize() ) .add( "Current Queue Size", getQueue().size() ) + .add( "Largest Queue Size", getLargestQueueSize() ) .add( "Max Queue Size", maximumQueueSize ) .add( "Active Thread Count", getActiveCount() ) .add( "Completed Task Count", getCompletedTaskCount() ) diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/ListenerNotificationQueueStats.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/ListenerNotificationQueueStats.java new file mode 100644 index 0000000000..a8edd9ab46 --- /dev/null +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/ListenerNotificationQueueStats.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.yangtools.util.concurrent; + +import java.beans.ConstructorProperties; + +/** + * Class used by the {@link QueuedNotificationManager} that contains a snapshot of notification + * queue statistics for a listener. + * + * @author Thomas Pantelis + * @see QueuedNotificationManager + */ +public class ListenerNotificationQueueStats { + + private final String listenerClassName; + private final int currentQueueSize; + + @ConstructorProperties({"listenerClassName","currentQueueSize"}) + public ListenerNotificationQueueStats( String listenerClassName, int currentQueueSize ) { + this.listenerClassName = listenerClassName; + this.currentQueueSize = currentQueueSize; + } + + /** + * Returns the name of the listener class. + */ + public String getListenerClassName(){ + return listenerClassName; + } + + /** + * Returns the current notification queue size. + */ + public int getCurrentQueueSize(){ + return currentQueueSize; + } +} diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManager.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManager.java index a939840d62..27c81a1ee1 100644 --- a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManager.java +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManager.java @@ -8,7 +8,9 @@ package org.opendaylight.yangtools.util.concurrent; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -182,6 +184,35 @@ public class QueuedNotificationManager implements NotificationManager } } + /** + * Returns {@link ListenerNotificationQueueStats} instances for each current listener + * notification task in progress. + */ + public List getListenerNotificationQueueStats() { + List statsList = new ArrayList<>( listenerCache.size() ); + for( NotificationTask task: listenerCache.values() ) { + statsList.add( new ListenerNotificationQueueStats( + task.listenerKey.getListener().getClass().getName(), + task.notificationQueue.size() ) ); + } + + return statsList ; + } + + /** + * Returns the maximum listener queue capacity. + */ + public int getMaxQueueCapacity(){ + return maxQueueCapacity; + } + + /** + * Returns the {@link Executor} to used for notification tasks. + */ + public Executor getExecutor(){ + return executor; + } + /** * Used as the listenerCache map key. We key by listener reference identity hashCode/equals. * Since we don't know anything about the listener class implementations and we're mixing diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/SpecialExecutors.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/SpecialExecutors.java index 0548d7a091..110ac1eeda 100644 --- a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/SpecialExecutors.java +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/SpecialExecutors.java @@ -9,7 +9,6 @@ package org.opendaylight.yangtools.util.concurrent; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** @@ -74,7 +73,7 @@ public final class SpecialExecutors { FastThreadPoolExecutor executor = new FastThreadPoolExecutor( maximumPoolSize, maximumQueueSize, threadPrefix ); - executor.setRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy() ); + executor.setRejectedExecutionHandler( CountingRejectedExecutionHandler.newCallerRunsPolicy() ); return executor; } @@ -130,7 +129,7 @@ public final class SpecialExecutors { CachedThreadPoolExecutor executor = new CachedThreadPoolExecutor( maximumPoolSize, maximumQueueSize, threadPrefix ); - executor.setRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy() ); + executor.setRejectedExecutionHandler( CountingRejectedExecutionHandler.newCallerRunsPolicy() ); return executor; } diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/TrackingLinkedBlockingQueue.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/TrackingLinkedBlockingQueue.java new file mode 100644 index 0000000000..38b5d9017f --- /dev/null +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/TrackingLinkedBlockingQueue.java @@ -0,0 +1,114 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.yangtools.util.concurrent; + +import java.util.Collection; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; + +/** + * A {@link LinkedBlockingQueue} that tracks the largest queue size for debugging. + * + * @author Thomas Pantelis + * + * @param the element t.ype + */ +public class TrackingLinkedBlockingQueue extends LinkedBlockingQueue { + + private static final long serialVersionUID = 1L; + + /** + * Holds largestQueueSize, this long field should be only accessed + * using {@value #LARGEST_QUEUE_SIZE_UPDATER} + */ + private volatile long largestQueueSize = 0; + + @SuppressWarnings("rawtypes") + private static AtomicLongFieldUpdater LARGEST_QUEUE_SIZE_UPDATER = AtomicLongFieldUpdater.newUpdater(TrackingLinkedBlockingQueue.class, "largestQueueSize"); + + /** + * @see LinkedBlockingQueue#LinkedBlockingQueue + */ + public TrackingLinkedBlockingQueue() { + super(); + } + + /** + * @see LinkedBlockingQueue#LinkedBlockingQueue(Collection) + */ + public TrackingLinkedBlockingQueue( Collection c ) { + super(c); + } + + /** + * @see LinkedBlockingQueue#LinkedBlockingQueue(int) + */ + public TrackingLinkedBlockingQueue( int capacity ) { + super(capacity); + } + + /** + * Returns the largest queue size. + */ + public long getLargestQueueSize(){ + return largestQueueSize; + } + + @Override + public boolean offer( E e, long timeout, TimeUnit unit ) throws InterruptedException { + if( super.offer( e, timeout, unit ) ) { + updateLargestQueueSize(); + return true; + } + + return false; + } + + @Override + public boolean offer( E e ) { + if( super.offer( e ) ) { + updateLargestQueueSize(); + return true; + } + + return false; + } + + @Override + public void put( E e ) throws InterruptedException { + super.put( e ); + updateLargestQueueSize(); + } + + @Override + public boolean add( E e ) { + boolean result = super.add( e ); + updateLargestQueueSize(); + return result; + } + + @Override + public boolean addAll( Collection c ) { + try { + return super.addAll( c ); + } finally { + updateLargestQueueSize(); + } + } + + private void updateLargestQueueSize() { + long size = size(); + long largest = largestQueueSize; + if( size > largest ) { + LARGEST_QUEUE_SIZE_UPDATER.compareAndSet(this, largest, size ); + } + } +} diff --git a/common/util/src/test/java/org/opendaylight/yangtools/util/DurationStatsTrackerTest.java b/common/util/src/test/java/org/opendaylight/yangtools/util/DurationStatsTrackerTest.java new file mode 100644 index 0000000000..70d63df2dc --- /dev/null +++ b/common/util/src/test/java/org/opendaylight/yangtools/util/DurationStatsTrackerTest.java @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.yangtools.util; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +/** + * Unit tests for DurationStatsTracker. + * + * @author Thomas Pantelis + */ +public class DurationStatsTrackerTest { + + @Test + public void test() { + + DurationStatsTracker tracker = new DurationStatsTracker(); + + tracker.addDuration(10000); + assertEquals("getTotalDurations", 1, tracker.getTotalDurations()); + assertEquals("getAverageDuration", 10000.0, tracker.getAverageDuration(), 0.1); + assertEquals("getLongestDuration", 10000, tracker.getLongestDuration()); + assertEquals("getShortestDuration", 10000, tracker.getShortestDuration()); + + tracker.addDuration(30000); + assertEquals("getTotalDurations", 2, tracker.getTotalDurations()); + assertEquals("getAverageDuration", 20000.0, tracker.getAverageDuration(), 0.1); + assertEquals("getLongestDuration", 30000, tracker.getLongestDuration()); + assertEquals("getShortestDuration", 10000, tracker.getShortestDuration()); + + verifyDisplayableString("getDisplayableAverageDuration", + tracker.getDisplayableAverageDuration(), "20.0"); + verifyDisplayableString("getDisplayableLongestDuration", + tracker.getDisplayableLongestDuration(), "30.0"); + verifyDisplayableString("getDisplayableShortestDuration", + tracker.getDisplayableShortestDuration(), "10.0"); + + tracker.addDuration(10000); + assertEquals("getTotalDurations", 3, tracker.getTotalDurations()); + assertEquals("getAverageDuration", 16666.0, tracker.getAverageDuration(), 1.0); + assertEquals("getLongestDuration", 30000, tracker.getLongestDuration()); + assertEquals("getShortestDuration", 10000, tracker.getShortestDuration()); + + tracker.addDuration(5000); + assertEquals("getTotalDurations", 4, tracker.getTotalDurations()); + assertEquals("getAverageDuration", 13750.0, tracker.getAverageDuration(), 1.0); + assertEquals("getLongestDuration", 30000, tracker.getLongestDuration()); + assertEquals("getShortestDuration", 5000, tracker.getShortestDuration()); + + tracker.reset(); + assertEquals("getTotalDurations", 0, tracker.getTotalDurations()); + assertEquals("getAverageDuration", 0.0, tracker.getAverageDuration(), 0.1); + assertEquals("getLongestDuration", 0, tracker.getLongestDuration()); + assertEquals("getShortestDuration", 0, tracker.getShortestDuration()); + + tracker.addDuration(10000); + assertEquals("getTotalDurations", 1, tracker.getTotalDurations()); + assertEquals("getAverageDuration", 10000.0, tracker.getAverageDuration(), 0.1); + assertEquals("getLongestDuration", 10000, tracker.getLongestDuration()); + assertEquals("getShortestDuration", 10000, tracker.getShortestDuration()); + } + + private void verifyDisplayableString(String name, String actual, String expPrefix) { + assertEquals(name + " starts with " + expPrefix + ". Actual: " + actual, + true, actual.startsWith(expPrefix)); + } +} diff --git a/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/CountingRejectedExecutionHandlerTest.java b/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/CountingRejectedExecutionHandlerTest.java new file mode 100644 index 0000000000..42c939ab34 --- /dev/null +++ b/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/CountingRejectedExecutionHandlerTest.java @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.yangtools.util.concurrent; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.junit.After; +import org.junit.Test; +import org.opendaylight.yangtools.util.ExecutorServiceUtil; +import org.opendaylight.yangtools.util.concurrent.ThreadPoolExecutorTest.Task; + +/** + * Unit tests for CountingRejectedExecutionHandler. + * + * @author Thomas Pantelis + */ +public class CountingRejectedExecutionHandlerTest { + + private ThreadPoolExecutor executor; + + @After + public void tearDown() { + if( executor != null ) { + executor.shutdownNow(); + } + } + + @Test + public void testCallerRunsPolicyHandler() throws InterruptedException { + + int nTasks = 5; + CountDownLatch tasksRunLatch = new CountDownLatch( 1 ); + CountDownLatch blockLatch = new CountDownLatch( 1 ); + + executor = new ThreadPoolExecutor( 1, 1, 0, TimeUnit.SECONDS, + ExecutorServiceUtil.offerFailingBlockingQueue( new LinkedBlockingQueue() ) ); + + CountingRejectedExecutionHandler countingHandler = + CountingRejectedExecutionHandler.newCallerRunsPolicy(); + executor.setRejectedExecutionHandler( countingHandler ); + + executor.execute( new Task( tasksRunLatch, blockLatch ) ); + + for( int i = 0; i < nTasks - 1; i++ ) { + executor.execute( new Task( null, null, null, null, 0 ) ); + } + + assertEquals( "getRejectedTaskCount", nTasks - 1, countingHandler.getRejectedTaskCount() ); + + blockLatch.countDown(); + + assertEquals( "Tasks complete", true, tasksRunLatch.await( 5, TimeUnit.SECONDS ) ); + } + + @Test + public void testAbortPolicyHandler() throws InterruptedException { + + int nTasks = 5; + CountDownLatch tasksRunLatch = new CountDownLatch( 1 ); + CountDownLatch blockLatch = new CountDownLatch( 1 ); + + executor = new ThreadPoolExecutor( 1, 1, 0, TimeUnit.SECONDS, + ExecutorServiceUtil.offerFailingBlockingQueue( new LinkedBlockingQueue() ) ); + + CountingRejectedExecutionHandler countingHandler = + CountingRejectedExecutionHandler.newAbortPolicy(); + executor.setRejectedExecutionHandler( countingHandler ); + + executor.execute( new Task( tasksRunLatch, blockLatch ) ); + + for( int i = 0; i < nTasks - 1; i++ ) { + try { + executor.execute( new Task( null, null, null, null, 0 ) ); + fail( "Expected RejectedExecutionException" ); + } catch( RejectedExecutionException e ) { + // Expected + } + } + + assertEquals( "getRejectedTaskCount", nTasks - 1, countingHandler.getRejectedTaskCount() ); + + blockLatch.countDown(); + + assertEquals( "Tasks complete", true, tasksRunLatch.await( 5, TimeUnit.SECONDS ) ); + } +} diff --git a/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/ThreadPoolExecutorTest.java b/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/ThreadPoolExecutorTest.java index 8270e45d35..4d280536a1 100644 --- a/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/ThreadPoolExecutorTest.java +++ b/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/ThreadPoolExecutorTest.java @@ -148,8 +148,9 @@ public class ThreadPoolExecutorTest { System.out.println(); } - private static class Task implements Runnable { + static class Task implements Runnable { final CountDownLatch tasksRunLatch; + final CountDownLatch blockLatch; final ConcurrentMap taskCountPerThread; final AtomicReference threadError; final String expThreadPrefix; @@ -162,16 +163,28 @@ public class ThreadPoolExecutorTest { this.threadError = threadError; this.expThreadPrefix = expThreadPrefix; this.delay = delay; + blockLatch = null; + } + + Task( CountDownLatch tasksRunLatch, CountDownLatch blockLatch ) { + this.tasksRunLatch = tasksRunLatch; + this.blockLatch = blockLatch; + this.taskCountPerThread = null; + this.threadError = null; + this.expThreadPrefix = null; + this.delay = 0; } @Override public void run() { try { - if( delay > 0 ) { - try { + try { + if( delay > 0 ) { TimeUnit.MICROSECONDS.sleep( delay ); - } catch( InterruptedException e ) {} - } + } else if( blockLatch != null ) { + blockLatch.await(); + } + } catch( InterruptedException e ) {} if( expThreadPrefix != null ) { assertEquals( "Thread name starts with " + expThreadPrefix, true, diff --git a/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/TrackingLinkedBlockingQueueTest.java b/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/TrackingLinkedBlockingQueueTest.java new file mode 100644 index 0000000000..e1119c3716 --- /dev/null +++ b/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/TrackingLinkedBlockingQueueTest.java @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.yangtools.util.concurrent; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +/** + * Unit tests for TrackingLinkedBlockingQueue. + * + * @author Thomas Pantelis + */ +public class TrackingLinkedBlockingQueueTest { + + @Test + public void testOffer() throws InterruptedException { + + TrackingLinkedBlockingQueue queue = new TrackingLinkedBlockingQueue<>( 2 ); + + assertEquals( "offer", true, queue.offer( "1" ) ); + assertEquals( "getLargestQueueSize", 1, queue.getLargestQueueSize() ); + assertEquals( "size", 1, queue.size() ); + + assertEquals( "offer", true, queue.offer( "2", 1, TimeUnit.MILLISECONDS ) ); + assertEquals( "getLargestQueueSize", 2, queue.getLargestQueueSize() ); + assertEquals( "size", 2, queue.size() ); + + assertEquals( "offer", false, queue.offer( "3" ) ); + assertEquals( "getLargestQueueSize", 2, queue.getLargestQueueSize() ); + assertEquals( "size", 2, queue.size() ); + + assertEquals( "offer", false, queue.offer( "4", 1, TimeUnit.MILLISECONDS ) ); + assertEquals( "getLargestQueueSize", 2, queue.getLargestQueueSize() ); + assertEquals( "size", 2, queue.size() ); + } + + @Test + public void testPut() throws InterruptedException { + + TrackingLinkedBlockingQueue queue = new TrackingLinkedBlockingQueue<>(); + + queue.put( "1" ); + assertEquals( "getLargestQueueSize", 1, queue.getLargestQueueSize() ); + assertEquals( "size", 1, queue.size() ); + + queue.put( "2" ); + assertEquals( "getLargestQueueSize", 2, queue.getLargestQueueSize() ); + assertEquals( "size", 2, queue.size() ); + } + + @Test + public void testAdd() { + + TrackingLinkedBlockingQueue queue = new TrackingLinkedBlockingQueue<>( 2 ); + + assertEquals( "add", true, queue.add( "1" ) ); + assertEquals( "getLargestQueueSize", 1, queue.getLargestQueueSize() ); + assertEquals( "size", 1, queue.size() ); + + assertEquals( "add", true, queue.add( "2" ) ); + assertEquals( "getLargestQueueSize", 2, queue.getLargestQueueSize() ); + assertEquals( "size", 2, queue.size() ); + + try { + queue.add( "3" ); + fail( "Expected IllegalStateException" ); + } catch( IllegalStateException e ) { + // Expected + assertEquals( "getLargestQueueSize", 2, queue.getLargestQueueSize() ); + assertEquals( "size", 2, queue.size() ); + } + } + + @Test + public void testAddAll() { + + TrackingLinkedBlockingQueue queue = new TrackingLinkedBlockingQueue<>( 3 ); + + queue.addAll( Arrays.asList( "1", "2" ) ); + assertEquals( "getLargestQueueSize", 2, queue.getLargestQueueSize() ); + assertEquals( "size", 2, queue.size() ); + + try { + queue.addAll( Arrays.asList( "3", "4" ) ); + fail( "Expected IllegalStateException" ); + } catch( IllegalStateException e ) { + // Expected + assertEquals( "getLargestQueueSize", 3, queue.getLargestQueueSize() ); + assertEquals( "size", 3, queue.size() ); + } + } +} -- 2.36.6