Merge "Bug 1446: Add new concurrent classes for tracking stats"
authorTony Tkacik <ttkacik@cisco.com>
Fri, 29 Aug 2014 20:47:04 +0000 (20:47 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 29 Aug 2014 20:47:04 +0000 (20:47 +0000)
13 files changed:
common/util/pom.xml
common/util/src/main/java/org/opendaylight/yangtools/util/DurationStatsTracker.java [new file with mode: 0644]
common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/CachedThreadPoolExecutor.java
common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/CountingRejectedExecutionHandler.java [new file with mode: 0644]
common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/FastThreadPoolExecutor.java
common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/ListenerNotificationQueueStats.java [new file with mode: 0644]
common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManager.java
common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/SpecialExecutors.java
common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/TrackingLinkedBlockingQueue.java [new file with mode: 0644]
common/util/src/test/java/org/opendaylight/yangtools/util/DurationStatsTrackerTest.java [new file with mode: 0644]
common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/CountingRejectedExecutionHandlerTest.java [new file with mode: 0644]
common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/ThreadPoolExecutorTest.java
common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/TrackingLinkedBlockingQueueTest.java [new file with mode: 0644]

index 5d185efe517345ac63c832837e7f3db06dd3e7c4..f3502891baa6b585d5460ef3f2300ed2d65d88f8 100644 (file)
                 <extensions>true</extensions>
                 <configuration>
                     <instructions>
-                        <Export-Package>
-                          org.opendaylight.yangtools.util,
-                          org.opendaylight.yangtools.util.concurrent
-                        </Export-Package>
                         <Embed-Dependency>java-concurrent-hash-trie-map;inline=true</Embed-Dependency>
                     </instructions>
                 </configuration>
diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/DurationStatsTracker.java b/common/util/src/main/java/org/opendaylight/yangtools/util/DurationStatsTracker.java
new file mode 100644 (file)
index 0000000..45ebd01
--- /dev/null
@@ -0,0 +1,185 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.yangtools.util;
+
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.util.concurrent.AtomicDouble;
+
+/**
+ * Class that calculates and tracks time duration statistics.
+ *
+ * @author Thomas Pantelis
+ */
+public class DurationStatsTracker {
+
+    private final AtomicLong totalDurations = new AtomicLong();
+    private final AtomicLong longestDuration = new AtomicLong();
+    private volatile long timeOfLongestDuration;
+    private final AtomicLong shortestDuration = new AtomicLong(Long.MAX_VALUE);
+    private volatile long timeOfShortestDuration;
+    private final AtomicDouble averageDuration = new AtomicDouble();
+
+    /**
+     * Add a duration to track.
+     *
+     * @param duration the duration in nanoseconds.
+     */
+    public void addDuration(long duration) {
+
+        double currentAve = averageDuration.get();
+        long currentTotal = totalDurations.get();
+
+        long newTotal = currentTotal + 1;
+
+        // Calculate moving cumulative average.
+        double newAve = currentAve * currentTotal / newTotal + duration / newTotal;
+
+        averageDuration.compareAndSet(currentAve, newAve);
+        totalDurations.compareAndSet(currentTotal, newTotal);
+
+        long longest = longestDuration.get();
+        if( duration > longest ) {
+            if(longestDuration.compareAndSet( longest, duration )) {
+                timeOfLongestDuration = System.currentTimeMillis();
+            }
+        }
+
+        long shortest = shortestDuration.get();
+        if( duration < shortest ) {
+            if(shortestDuration.compareAndSet( shortest, duration )) {
+                timeOfShortestDuration = System.currentTimeMillis();
+            }
+        }
+    }
+
+    /**
+     * Returns the total number of tracked durations.
+     */
+    public long getTotalDurations() {
+        return totalDurations.get();
+    }
+
+    /**
+     * Returns the longest duration in nanoseconds.
+     */
+    public long getLongestDuration() {
+        return longestDuration.get();
+    }
+
+    /**
+     * Returns the shortest duration in nanoseconds.
+     */
+    public long getShortestDuration() {
+        long shortest = shortestDuration.get();
+        return shortest < Long.MAX_VALUE ? shortest : 0;
+    }
+
+    /**
+     * Returns the average duration in nanoseconds.
+     */
+    public double getAverageDuration() {
+        return averageDuration.get();
+    }
+
+    /**
+     * Returns the time stamp of the longest duration.
+     */
+    public long getTimeOfLongestDuration() {
+        return timeOfLongestDuration;
+    }
+
+    /**
+     * Returns the time stamp of the shortest duration.
+     */
+    public long getTimeOfShortestDuration() {
+        return timeOfShortestDuration;
+    }
+
+    /**
+     * Resets all statistics back to their defaults.
+     */
+    public void reset() {
+        totalDurations.set(0);
+        longestDuration.set(0);
+        timeOfLongestDuration = 0;
+        shortestDuration.set(Long.MAX_VALUE);
+        timeOfShortestDuration = 0;
+        averageDuration.set(0.0);
+    }
+
+    /**
+     * Returns the average duration as a displayable String with units, e.g. "12.34 ms".
+     */
+    public String getDisplayableAverageDuration() {
+        return formatDuration(getAverageDuration(), 0);
+    }
+
+    /**
+     * Returns the shortest duration as a displayable String with units and the date/time at
+     * which it occurred, e.g. "12.34 ms at 08/02/2014 12:30:24".
+     */
+    public String getDisplayableShortestDuration() {
+        return formatDuration(getShortestDuration(), getTimeOfShortestDuration());
+    }
+
+    /**
+     * Returns the longest duration as a displayable String with units and the date/time at
+     * which it occurred, e.g. "12.34 ms at 08/02/2014 12:30:24".
+     */
+    public String getDisplayableLongestDuration() {
+        return formatDuration(getLongestDuration(), getTimeOfLongestDuration());
+    }
+
+    private String formatDuration(double duration, long timeStamp) {
+        TimeUnit unit = chooseUnit((long)duration);
+        double value = duration / NANOSECONDS.convert(1, unit);
+        return timeStamp > 0 ?
+                String.format("%.4g %s at %3$tD %3$tT", value, abbreviate(unit), new Date(timeStamp)) :
+                String.format("%.4g %s", value, abbreviate(unit));
+    }
+
+    private static TimeUnit chooseUnit(long nanos) {
+        if(SECONDS.convert(nanos, NANOSECONDS) > 0) {
+            return SECONDS;
+        }
+
+        if(MILLISECONDS.convert(nanos, NANOSECONDS) > 0) {
+            return MILLISECONDS;
+        }
+
+        if(MICROSECONDS.convert(nanos, NANOSECONDS) > 0) {
+            return MICROSECONDS;
+        }
+
+        return NANOSECONDS;
+    }
+
+    private static String abbreviate(TimeUnit unit) {
+        switch(unit) {
+            case NANOSECONDS:
+                return "ns";
+            case MICROSECONDS:
+                return "\u03bcs"; // μs
+            case MILLISECONDS:
+                return "ms";
+            case SECONDS:
+                return "s";
+            default:
+                return "";
+        }
+    }
+}
index 4936efaba132fa150d60fa3c316975f6d81b6746..a7dd4af009e06457925d305483c1bcb048455c62 100644 (file)
@@ -14,8 +14,6 @@ import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
 import com.google.common.base.Objects;
 import com.google.common.base.Objects.ToStringHelper;
 import com.google.common.base.Preconditions;
@@ -33,8 +31,6 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
 
     private static final long IDLE_TIMEOUT_IN_SEC = 60L;
 
-    private final AtomicLong largestBackingQueueSize = new AtomicLong( 0 );
-
     private final ExecutorQueue executorQueue;
 
     private final String threadPrefix;
@@ -76,22 +72,28 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
         executorQueue = (ExecutorQueue)super.getQueue();
 
         rejectedTaskHandler = new RejectedTaskHandler(
-                executorQueue.getBackingQueue(), largestBackingQueueSize );
+                executorQueue.getBackingQueue(), CountingRejectedExecutionHandler.newAbortPolicy() );
         super.setRejectedExecutionHandler( rejectedTaskHandler );
     }
 
     @Override
     public void setRejectedExecutionHandler( RejectedExecutionHandler handler ) {
+        Preconditions.checkNotNull( handler );
         rejectedTaskHandler.setDelegateRejectedExecutionHandler( handler );
     }
 
+    @Override
+    public RejectedExecutionHandler getRejectedExecutionHandler(){
+        return rejectedTaskHandler.getDelegateRejectedExecutionHandler();
+    }
+
     @Override
     public BlockingQueue<Runnable> getQueue(){
         return executorQueue.getBackingQueue();
     }
 
     public long getLargestQueueSize() {
-        return largestBackingQueueSize.get();
+        return ((TrackingLinkedBlockingQueue<?>)executorQueue.getBackingQueue()).getLargestQueueSize();
     }
 
     protected ToStringHelper addToStringAttributes( ToStringHelper toStringHelper ) {
@@ -129,7 +131,7 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
         private final LinkedBlockingQueue<Runnable> backingQueue;
 
         ExecutorQueue( int maxBackingQueueSize ) {
-            backingQueue = new LinkedBlockingQueue<>( maxBackingQueueSize );
+            backingQueue = new TrackingLinkedBlockingQueue<>( maxBackingQueueSize );
         }
 
         LinkedBlockingQueue<Runnable> getBackingQueue() {
@@ -189,20 +191,23 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
     private static class RejectedTaskHandler implements RejectedExecutionHandler {
 
         private final LinkedBlockingQueue<Runnable> backingQueue;
-        private final AtomicLong largestBackingQueueSize;
         private volatile RejectedExecutionHandler delegateRejectedExecutionHandler;
 
         RejectedTaskHandler( LinkedBlockingQueue<Runnable> backingQueue,
-                             AtomicLong largestBackingQueueSize ) {
+                             RejectedExecutionHandler delegateRejectedExecutionHandler ) {
             this.backingQueue = backingQueue;
-            this.largestBackingQueueSize = largestBackingQueueSize;
+            this.delegateRejectedExecutionHandler = delegateRejectedExecutionHandler;
         }
 
         void setDelegateRejectedExecutionHandler(
-                RejectedExecutionHandler delegateRejectedExecutionHandler ){
+                RejectedExecutionHandler delegateRejectedExecutionHandler ) {
             this.delegateRejectedExecutionHandler = delegateRejectedExecutionHandler;
         }
 
+        RejectedExecutionHandler getDelegateRejectedExecutionHandler(){
+            return delegateRejectedExecutionHandler;
+        }
+
         @Override
         public void rejectedExecution( Runnable task, ThreadPoolExecutor executor ) {
             if( executor.isShutdown() ) {
@@ -210,19 +215,7 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
             }
 
             if( !backingQueue.offer( task ) ) {
-                if( delegateRejectedExecutionHandler != null ) {
-                    delegateRejectedExecutionHandler.rejectedExecution( task, executor );
-                } else {
-                    throw new RejectedExecutionException(
-                                                "All threads are in use and the queue is full" );
-                }
-            }
-
-            largestBackingQueueSize.incrementAndGet();
-            long size = backingQueue.size();
-            long largest = largestBackingQueueSize.get();
-            if( size > largest ) {
-                largestBackingQueueSize.compareAndSet( largest, size );
+                delegateRejectedExecutionHandler.rejectedExecution( task, executor );
             }
         }
     }
diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/CountingRejectedExecutionHandler.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/CountingRejectedExecutionHandler.java
new file mode 100644 (file)
index 0000000..ab010c9
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.yangtools.util.concurrent;
+
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.opendaylight.yangtools.util.ExecutorServiceUtil;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A RejectedExecutionHandler that delegates to a backing RejectedExecutionHandler and counts the
+ * number of rejected tasks.
+ *
+ * @author Thomas Pantelis
+ */
+public class CountingRejectedExecutionHandler implements RejectedExecutionHandler {
+
+    private final RejectedExecutionHandler delegate;
+    private final AtomicLong rejectedTaskCounter = new AtomicLong();
+
+    /**
+     * Constructor.
+     *
+     * @param delegate the backing RejectedExecutionHandler.
+     */
+    public CountingRejectedExecutionHandler( RejectedExecutionHandler delegate ) {
+        this.delegate = Preconditions.checkNotNull( delegate );
+    }
+
+    @Override
+    public void rejectedExecution( Runnable task, ThreadPoolExecutor executor ) {
+        rejectedTaskCounter.incrementAndGet();
+        delegate.rejectedExecution( task, executor );
+    }
+
+    /**
+     * Returns the rejected task count.
+     */
+    public long getRejectedTaskCount(){
+        return rejectedTaskCounter.get();
+    }
+
+    /**
+     * Returns s counting handler for rejected tasks that runs the rejected task directly in the
+     * calling thread of the execute method, unless the executor has been shut down, in which case
+     * the task is discarded.
+     */
+    public static CountingRejectedExecutionHandler newCallerRunsPolicy() {
+        return new CountingRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy() );
+    }
+
+    /**
+     * Returns a counting handler for rejected tasks that throws a RejectedExecutionException.
+     */
+    public static CountingRejectedExecutionHandler newAbortPolicy() {
+        return new CountingRejectedExecutionHandler( new ThreadPoolExecutor.AbortPolicy() );
+    }
+
+    /**
+     * Returns a counting handler for rejected tasks that that blocks on the
+     * {@link ThreadPoolExecutor}'s backing queue until it can add the task to the queue.
+     */
+    public static CountingRejectedExecutionHandler newCallerWaitsPolicy() {
+        return new CountingRejectedExecutionHandler( ExecutorServiceUtil.waitInQueueExecutionHandler() );
+    }
+}
index b7549eb24e7fc602293bce054ba5ed069ecc6c1f..bf92ca5d9b47037a2366429ec8007b9fd176a2d0 100644 (file)
@@ -8,7 +8,6 @@
 
 package org.opendaylight.yangtools.util.concurrent;
 
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
@@ -70,7 +69,7 @@ public class FastThreadPoolExecutor extends ThreadPoolExecutor {
         // reached, subsequent tasks will be queued. If the queue is full, tasks will be rejected.
 
         super( maximumPoolSize, maximumPoolSize, keepAliveTime, unit,
-               new LinkedBlockingQueue<Runnable>( maximumQueueSize ) );
+               new TrackingLinkedBlockingQueue<Runnable>( maximumQueueSize ) );
 
         this.threadPrefix = threadPrefix;
         this.maximumQueueSize = maximumQueueSize;
@@ -82,6 +81,12 @@ public class FastThreadPoolExecutor extends ThreadPoolExecutor {
             // Need to specifically configure core threads to timeout.
             allowCoreThreadTimeOut( true );
         }
+
+        setRejectedExecutionHandler( CountingRejectedExecutionHandler.newAbortPolicy() );
+    }
+
+    public long getLargestQueueSize() {
+        return ((TrackingLinkedBlockingQueue<?>)getQueue()).getLargestQueueSize();
     }
 
     protected ToStringHelper addToStringAttributes( ToStringHelper toStringHelper ) {
@@ -96,6 +101,7 @@ public class FastThreadPoolExecutor extends ThreadPoolExecutor {
                 .add( "Largest Thread Pool Size", getLargestPoolSize() )
                 .add( "Max Thread Pool Size", getMaximumPoolSize() )
                 .add( "Current Queue Size", getQueue().size() )
+                .add( "Largest Queue Size", getLargestQueueSize() )
                 .add( "Max Queue Size", maximumQueueSize )
                 .add( "Active Thread Count", getActiveCount() )
                 .add( "Completed Task Count", getCompletedTaskCount() )
diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/ListenerNotificationQueueStats.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/ListenerNotificationQueueStats.java
new file mode 100644 (file)
index 0000000..a8edd9a
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.yangtools.util.concurrent;
+
+import java.beans.ConstructorProperties;
+
+/**
+ * Class used by the {@link QueuedNotificationManager} that contains a snapshot of notification
+ * queue statistics for a listener.
+ *
+ * @author Thomas Pantelis
+ * @see QueuedNotificationManager
+ */
+public class ListenerNotificationQueueStats {
+
+    private final String listenerClassName;
+    private final int currentQueueSize;
+
+    @ConstructorProperties({"listenerClassName","currentQueueSize"})
+    public ListenerNotificationQueueStats( String listenerClassName, int currentQueueSize ) {
+        this.listenerClassName = listenerClassName;
+        this.currentQueueSize = currentQueueSize;
+    }
+
+    /**
+     * Returns the name of the listener class.
+     */
+    public String getListenerClassName(){
+        return listenerClassName;
+    }
+
+    /**
+     * Returns the current notification queue size.
+     */
+    public int getCurrentQueueSize(){
+        return currentQueueSize;
+    }
+}
index a939840d6255497b47a9356d5f8b133b52c61290..27c81a1ee1969436c03a1405a81544c32109e6e0 100644 (file)
@@ -8,7 +8,9 @@
 
 package org.opendaylight.yangtools.util.concurrent;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -182,6 +184,35 @@ public class QueuedNotificationManager<L,N> implements NotificationManager<L,N>
         }
     }
 
+    /**
+     * Returns {@link ListenerNotificationQueueStats} instances for each current listener
+     * notification task in progress.
+     */
+    public List<ListenerNotificationQueueStats> getListenerNotificationQueueStats() {
+        List<ListenerNotificationQueueStats> statsList = new ArrayList<>( listenerCache.size() );
+        for( NotificationTask task: listenerCache.values() ) {
+            statsList.add( new ListenerNotificationQueueStats(
+                    task.listenerKey.getListener().getClass().getName(),
+                    task.notificationQueue.size() ) );
+        }
+
+        return statsList ;
+    }
+
+    /**
+     * Returns the maximum listener queue capacity.
+     */
+    public int getMaxQueueCapacity(){
+        return maxQueueCapacity;
+    }
+
+    /**
+     * Returns the {@link Executor} to used for notification tasks.
+     */
+    public Executor getExecutor(){
+        return executor;
+    }
+
     /**
      * Used as the listenerCache map key. We key by listener reference identity hashCode/equals.
      * Since we don't know anything about the listener class implementations and we're mixing
index 0548d7a09142cf8195e377c778b1776756ff4889..110ac1eedab6e39bcce03955ef2ee471b143cc8e 100644 (file)
@@ -9,7 +9,6 @@
 package org.opendaylight.yangtools.util.concurrent;
 
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -74,7 +73,7 @@ public final class SpecialExecutors {
 
         FastThreadPoolExecutor executor =
                 new FastThreadPoolExecutor( maximumPoolSize, maximumQueueSize, threadPrefix );
-        executor.setRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy() );
+        executor.setRejectedExecutionHandler( CountingRejectedExecutionHandler.newCallerRunsPolicy() );
         return executor;
     }
 
@@ -130,7 +129,7 @@ public final class SpecialExecutors {
 
         CachedThreadPoolExecutor executor =
                 new CachedThreadPoolExecutor( maximumPoolSize, maximumQueueSize, threadPrefix );
-        executor.setRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy() );
+        executor.setRejectedExecutionHandler( CountingRejectedExecutionHandler.newCallerRunsPolicy() );
         return executor;
     }
 
diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/TrackingLinkedBlockingQueue.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/TrackingLinkedBlockingQueue.java
new file mode 100644 (file)
index 0000000..38b5d90
--- /dev/null
@@ -0,0 +1,114 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.yangtools.util.concurrent;
+
+import java.util.Collection;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+
+/**
+ * A {@link LinkedBlockingQueue} that tracks the largest queue size for debugging.
+ *
+ * @author Thomas Pantelis
+ *
+ * @param <E> the element t.ype
+ */
+public class TrackingLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Holds largestQueueSize, this long field should be only accessed
+     * using {@value #LARGEST_QUEUE_SIZE_UPDATER}
+     */
+    private volatile long largestQueueSize = 0;
+
+    @SuppressWarnings("rawtypes")
+    private static AtomicLongFieldUpdater<TrackingLinkedBlockingQueue> LARGEST_QUEUE_SIZE_UPDATER = AtomicLongFieldUpdater.newUpdater(TrackingLinkedBlockingQueue.class, "largestQueueSize");
+
+    /**
+     * @see LinkedBlockingQueue#LinkedBlockingQueue
+     */
+    public TrackingLinkedBlockingQueue() {
+        super();
+    }
+
+    /**
+     * @see LinkedBlockingQueue#LinkedBlockingQueue(Collection)
+     */
+    public TrackingLinkedBlockingQueue( Collection<? extends E> c ) {
+        super(c);
+    }
+
+    /**
+     * @see LinkedBlockingQueue#LinkedBlockingQueue(int)
+     */
+    public TrackingLinkedBlockingQueue( int capacity ) {
+        super(capacity);
+    }
+
+    /**
+     * Returns the largest queue size.
+     */
+    public long getLargestQueueSize(){
+        return largestQueueSize;
+    }
+
+    @Override
+    public boolean offer( E e, long timeout, TimeUnit unit ) throws InterruptedException {
+        if( super.offer( e, timeout, unit ) ) {
+            updateLargestQueueSize();
+            return true;
+        }
+
+        return false;
+    }
+
+    @Override
+    public boolean offer( E e ) {
+        if( super.offer( e ) ) {
+            updateLargestQueueSize();
+            return true;
+        }
+
+        return false;
+    }
+
+    @Override
+    public void put( E e ) throws InterruptedException {
+        super.put( e );
+        updateLargestQueueSize();
+    }
+
+    @Override
+    public boolean add( E e ) {
+        boolean result = super.add( e );
+        updateLargestQueueSize();
+        return result;
+    }
+
+    @Override
+    public boolean addAll( Collection<? extends E> c ) {
+        try {
+            return super.addAll( c );
+        } finally {
+            updateLargestQueueSize();
+        }
+    }
+
+    private void updateLargestQueueSize() {
+        long size = size();
+        long largest = largestQueueSize;
+        if( size > largest ) {
+            LARGEST_QUEUE_SIZE_UPDATER.compareAndSet(this, largest, size );
+        }
+    }
+}
diff --git a/common/util/src/test/java/org/opendaylight/yangtools/util/DurationStatsTrackerTest.java b/common/util/src/test/java/org/opendaylight/yangtools/util/DurationStatsTrackerTest.java
new file mode 100644 (file)
index 0000000..70d63df
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.yangtools.util;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+/**
+ * Unit tests for DurationStatsTracker.
+ *
+ * @author Thomas Pantelis
+ */
+public class DurationStatsTrackerTest {
+
+    @Test
+    public void test() {
+
+        DurationStatsTracker tracker = new DurationStatsTracker();
+
+        tracker.addDuration(10000);
+        assertEquals("getTotalDurations", 1, tracker.getTotalDurations());
+        assertEquals("getAverageDuration", 10000.0, tracker.getAverageDuration(), 0.1);
+        assertEquals("getLongestDuration", 10000, tracker.getLongestDuration());
+        assertEquals("getShortestDuration", 10000, tracker.getShortestDuration());
+
+        tracker.addDuration(30000);
+        assertEquals("getTotalDurations", 2, tracker.getTotalDurations());
+        assertEquals("getAverageDuration", 20000.0, tracker.getAverageDuration(), 0.1);
+        assertEquals("getLongestDuration", 30000, tracker.getLongestDuration());
+        assertEquals("getShortestDuration", 10000, tracker.getShortestDuration());
+
+        verifyDisplayableString("getDisplayableAverageDuration",
+                tracker.getDisplayableAverageDuration(), "20.0");
+        verifyDisplayableString("getDisplayableLongestDuration",
+                tracker.getDisplayableLongestDuration(), "30.0");
+        verifyDisplayableString("getDisplayableShortestDuration",
+                tracker.getDisplayableShortestDuration(), "10.0");
+
+        tracker.addDuration(10000);
+        assertEquals("getTotalDurations", 3, tracker.getTotalDurations());
+        assertEquals("getAverageDuration", 16666.0, tracker.getAverageDuration(), 1.0);
+        assertEquals("getLongestDuration", 30000, tracker.getLongestDuration());
+        assertEquals("getShortestDuration", 10000, tracker.getShortestDuration());
+
+        tracker.addDuration(5000);
+        assertEquals("getTotalDurations", 4, tracker.getTotalDurations());
+        assertEquals("getAverageDuration", 13750.0, tracker.getAverageDuration(), 1.0);
+        assertEquals("getLongestDuration", 30000, tracker.getLongestDuration());
+        assertEquals("getShortestDuration", 5000, tracker.getShortestDuration());
+
+        tracker.reset();
+        assertEquals("getTotalDurations", 0, tracker.getTotalDurations());
+        assertEquals("getAverageDuration", 0.0, tracker.getAverageDuration(), 0.1);
+        assertEquals("getLongestDuration", 0, tracker.getLongestDuration());
+        assertEquals("getShortestDuration", 0, tracker.getShortestDuration());
+
+        tracker.addDuration(10000);
+        assertEquals("getTotalDurations", 1, tracker.getTotalDurations());
+        assertEquals("getAverageDuration", 10000.0, tracker.getAverageDuration(), 0.1);
+        assertEquals("getLongestDuration", 10000, tracker.getLongestDuration());
+        assertEquals("getShortestDuration", 10000, tracker.getShortestDuration());
+    }
+
+    private void verifyDisplayableString(String name, String actual, String expPrefix) {
+        assertEquals(name + " starts with " + expPrefix + ". Actual: " + actual,
+                true, actual.startsWith(expPrefix));
+    }
+}
diff --git a/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/CountingRejectedExecutionHandlerTest.java b/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/CountingRejectedExecutionHandlerTest.java
new file mode 100644 (file)
index 0000000..42c939a
--- /dev/null
@@ -0,0 +1,99 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.yangtools.util.concurrent;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Test;
+import org.opendaylight.yangtools.util.ExecutorServiceUtil;
+import org.opendaylight.yangtools.util.concurrent.ThreadPoolExecutorTest.Task;
+
+/**
+ * Unit tests for CountingRejectedExecutionHandler.
+ *
+ * @author Thomas Pantelis
+ */
+public class CountingRejectedExecutionHandlerTest {
+
+    private ThreadPoolExecutor executor;
+
+    @After
+    public void tearDown() {
+        if( executor != null ) {
+            executor.shutdownNow();
+        }
+    }
+
+    @Test
+    public void testCallerRunsPolicyHandler() throws InterruptedException {
+
+        int nTasks = 5;
+        CountDownLatch tasksRunLatch = new CountDownLatch( 1 );
+        CountDownLatch blockLatch = new CountDownLatch( 1 );
+
+        executor = new ThreadPoolExecutor( 1, 1, 0, TimeUnit.SECONDS,
+                ExecutorServiceUtil.offerFailingBlockingQueue( new LinkedBlockingQueue<Runnable>() ) );
+
+        CountingRejectedExecutionHandler countingHandler =
+                CountingRejectedExecutionHandler.newCallerRunsPolicy();
+        executor.setRejectedExecutionHandler( countingHandler );
+
+        executor.execute( new Task( tasksRunLatch, blockLatch ) );
+
+        for( int i = 0; i < nTasks - 1; i++ ) {
+            executor.execute( new Task( null, null, null, null, 0 ) );
+        }
+
+        assertEquals( "getRejectedTaskCount", nTasks - 1, countingHandler.getRejectedTaskCount() );
+
+        blockLatch.countDown();
+
+        assertEquals( "Tasks complete", true, tasksRunLatch.await( 5, TimeUnit.SECONDS ) );
+    }
+
+    @Test
+    public void testAbortPolicyHandler() throws InterruptedException {
+
+        int nTasks = 5;
+        CountDownLatch tasksRunLatch = new CountDownLatch( 1 );
+        CountDownLatch blockLatch = new CountDownLatch( 1 );
+
+        executor = new ThreadPoolExecutor( 1, 1, 0, TimeUnit.SECONDS,
+                ExecutorServiceUtil.offerFailingBlockingQueue( new LinkedBlockingQueue<Runnable>() ) );
+
+        CountingRejectedExecutionHandler countingHandler =
+                CountingRejectedExecutionHandler.newAbortPolicy();
+        executor.setRejectedExecutionHandler( countingHandler );
+
+        executor.execute( new Task( tasksRunLatch, blockLatch ) );
+
+        for( int i = 0; i < nTasks - 1; i++ ) {
+            try {
+                executor.execute( new Task( null, null, null, null, 0 ) );
+                fail( "Expected RejectedExecutionException" );
+            } catch( RejectedExecutionException e ) {
+                // Expected
+            }
+        }
+
+        assertEquals( "getRejectedTaskCount", nTasks - 1, countingHandler.getRejectedTaskCount() );
+
+        blockLatch.countDown();
+
+        assertEquals( "Tasks complete", true, tasksRunLatch.await( 5, TimeUnit.SECONDS ) );
+    }
+}
index 8270e45d357ca6fffd6ecdb407d84d37115155cc..4d280536a1c865b4896d764b235f69d442e86cd2 100644 (file)
@@ -148,8 +148,9 @@ public class ThreadPoolExecutorTest {
         System.out.println();
     }
 
-    private static class Task implements Runnable {
+    static class Task implements Runnable {
         final CountDownLatch tasksRunLatch;
+        final CountDownLatch blockLatch;
         final ConcurrentMap<Thread, AtomicLong> taskCountPerThread;
         final AtomicReference<AssertionError> threadError;
         final String expThreadPrefix;
@@ -162,16 +163,28 @@ public class ThreadPoolExecutorTest {
             this.threadError = threadError;
             this.expThreadPrefix = expThreadPrefix;
             this.delay = delay;
+            blockLatch = null;
+        }
+
+        Task( CountDownLatch tasksRunLatch, CountDownLatch blockLatch ) {
+            this.tasksRunLatch = tasksRunLatch;
+            this.blockLatch = blockLatch;
+            this.taskCountPerThread = null;
+            this.threadError = null;
+            this.expThreadPrefix = null;
+            this.delay = 0;
         }
 
         @Override
         public void run() {
             try {
-                if( delay > 0 ) {
-                    try {
+                try {
+                    if( delay > 0 ) {
                         TimeUnit.MICROSECONDS.sleep( delay );
-                    } catch( InterruptedException e ) {}
-                }
+                    } else if( blockLatch != null ) {
+                        blockLatch.await();
+                    }
+                } catch( InterruptedException e ) {}
 
                 if( expThreadPrefix != null ) {
                     assertEquals( "Thread name starts with " + expThreadPrefix, true,
diff --git a/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/TrackingLinkedBlockingQueueTest.java b/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/TrackingLinkedBlockingQueueTest.java
new file mode 100644 (file)
index 0000000..e1119c3
--- /dev/null
@@ -0,0 +1,103 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.yangtools.util.concurrent;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+/**
+ * Unit tests for TrackingLinkedBlockingQueue.
+ *
+ * @author Thomas Pantelis
+ */
+public class TrackingLinkedBlockingQueueTest {
+
+    @Test
+    public void testOffer() throws InterruptedException {
+
+        TrackingLinkedBlockingQueue<String> queue = new TrackingLinkedBlockingQueue<>( 2 );
+
+        assertEquals( "offer", true, queue.offer( "1" ) );
+        assertEquals( "getLargestQueueSize", 1, queue.getLargestQueueSize() );
+        assertEquals( "size", 1, queue.size() );
+
+        assertEquals( "offer", true, queue.offer( "2", 1, TimeUnit.MILLISECONDS ) );
+        assertEquals( "getLargestQueueSize", 2, queue.getLargestQueueSize() );
+        assertEquals( "size", 2, queue.size() );
+
+        assertEquals( "offer", false, queue.offer( "3" ) );
+        assertEquals( "getLargestQueueSize", 2, queue.getLargestQueueSize() );
+        assertEquals( "size", 2, queue.size() );
+
+        assertEquals( "offer", false, queue.offer( "4", 1, TimeUnit.MILLISECONDS ) );
+        assertEquals( "getLargestQueueSize", 2, queue.getLargestQueueSize() );
+        assertEquals( "size", 2, queue.size() );
+    }
+
+    @Test
+    public void testPut() throws InterruptedException {
+
+        TrackingLinkedBlockingQueue<String> queue = new TrackingLinkedBlockingQueue<>();
+
+        queue.put( "1" );
+        assertEquals( "getLargestQueueSize", 1, queue.getLargestQueueSize() );
+        assertEquals( "size", 1, queue.size() );
+
+        queue.put( "2" );
+        assertEquals( "getLargestQueueSize", 2, queue.getLargestQueueSize() );
+        assertEquals( "size", 2, queue.size() );
+    }
+
+    @Test
+    public void testAdd() {
+
+        TrackingLinkedBlockingQueue<String> queue = new TrackingLinkedBlockingQueue<>( 2 );
+
+        assertEquals( "add", true, queue.add( "1" ) );
+        assertEquals( "getLargestQueueSize", 1, queue.getLargestQueueSize() );
+        assertEquals( "size", 1, queue.size() );
+
+        assertEquals( "add", true, queue.add( "2" ) );
+        assertEquals( "getLargestQueueSize", 2, queue.getLargestQueueSize() );
+        assertEquals( "size", 2, queue.size() );
+
+        try {
+            queue.add( "3" );
+            fail( "Expected IllegalStateException" );
+        } catch( IllegalStateException e ) {
+            // Expected
+            assertEquals( "getLargestQueueSize", 2, queue.getLargestQueueSize() );
+            assertEquals( "size", 2, queue.size() );
+        }
+    }
+
+    @Test
+    public void testAddAll() {
+
+        TrackingLinkedBlockingQueue<String> queue = new TrackingLinkedBlockingQueue<>( 3 );
+
+        queue.addAll( Arrays.asList( "1", "2" ) );
+        assertEquals( "getLargestQueueSize", 2, queue.getLargestQueueSize() );
+        assertEquals( "size", 2, queue.size() );
+
+        try {
+            queue.addAll( Arrays.asList( "3", "4" ) );
+            fail( "Expected IllegalStateException" );
+        } catch( IllegalStateException e ) {
+            // Expected
+            assertEquals( "getLargestQueueSize", 3, queue.getLargestQueueSize() );
+            assertEquals( "size", 3, queue.size() );
+        }
+    }
+}