BUG-1291 Fix executor for netconf-connector rejecting tasks. 49/9049/4
authorMaros Marsalek <mmarsale@cisco.com>
Wed, 16 Jul 2014 08:27:53 +0000 (10:27 +0200)
committerMaros Marsalek <mmarsale@cisco.com>
Thu, 17 Jul 2014 13:42:45 +0000 (15:42 +0200)
Flexible threadpool rejected tasks with SynchronousQueue and did not spawn threads with LinkedBlockingQueue.
Added custom Queue that makes the underlying threadpool to spawn threads as expected.

Change-Id: I28d27316cf63b3370aa802bbd31ee976aa54fbf7
Signed-off-by: Maros Marsalek <mmarsale@cisco.com>
opendaylight/config/threadpool-config-impl/src/main/java/org/opendaylight/controller/config/threadpool/util/FlexibleThreadPoolWrapper.java
opendaylight/config/threadpool-config-impl/src/main/java/org/opendaylight/controller/config/yang/threadpool/impl/flexible/FlexibleThreadPoolModule.java
opendaylight/config/threadpool-config-impl/src/main/yang/threadpool-impl-flexible.yang
opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/01-netconf.xml

index 3dfa6e2..5036399 100644 (file)
@@ -10,15 +10,20 @@ package org.opendaylight.controller.config.threadpool.util;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.opendaylight.controller.config.threadpool.ThreadPool;
 
+import com.google.common.base.Optional;
+
 /**
  * Implementation of {@link ThreadPool} using flexible number of threads wraps
  * {@link ExecutorService}.
@@ -28,12 +33,33 @@ public class FlexibleThreadPoolWrapper implements ThreadPool, Closeable {
 
     public FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit,
             ThreadFactory threadFactory) {
+        this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(Optional.<Integer>absent()));
+    }
+
+    public FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit,
+            ThreadFactory threadFactory, Optional<Integer> queueCapacity) {
+        this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(queueCapacity));
+    }
+
+    private FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit,
+            ThreadFactory threadFactory, BlockingQueue<Runnable> queue) {
 
         executor = new ThreadPoolExecutor(minThreadCount, maxThreadCount, keepAlive, timeUnit,
-                new SynchronousQueue<Runnable>(), threadFactory);
+                queue, threadFactory, new FlexibleRejectionHandler());
         executor.prestartAllCoreThreads();
     }
 
+    /**
+     * Overriding the queue:
+     * ThreadPoolExecutor would not create new threads if the queue is not full, thus adding
+     * occurs in RejectedExecutionHandler.
+     * This impl saturates threadpool first, then queue. When both are full caller will get blocked.
+     */
+    private static ForwardingBlockingQueue getQueue(Optional<Integer> capacity) {
+        final BlockingQueue<Runnable> delegate = capacity.isPresent() ? new LinkedBlockingQueue<Runnable>(capacity.get()) : new LinkedBlockingQueue<Runnable>();
+        return new ForwardingBlockingQueue(delegate);
+    }
+
     @Override
     public ExecutorService getExecutor() {
         return Executors.unconfigurableExecutorService(executor);
@@ -77,4 +103,37 @@ public class FlexibleThreadPoolWrapper implements ThreadPool, Closeable {
         executor.shutdown();
     }
 
+    /**
+     * if the max threads are met, then it will raise a rejectedExecution. We then push to the queue.
+     */
+    private static class FlexibleRejectionHandler implements RejectedExecutionHandler {
+        @Override
+        public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
+            try {
+                executor.getQueue().put(r);
+            } catch (InterruptedException e) {
+                throw new RejectedExecutionException("Interrupted while waiting on the queue", e);
+            }
+        }
+    }
+
+    private static class ForwardingBlockingQueue extends com.google.common.util.concurrent.ForwardingBlockingQueue<Runnable> {
+        private final BlockingQueue<Runnable> delegate;
+
+        public ForwardingBlockingQueue(BlockingQueue<Runnable> delegate) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        protected BlockingQueue<Runnable> delegate() {
+            return delegate;
+        }
+
+        @Override
+        public boolean offer(final Runnable r) {
+            // ThreadPoolExecutor will spawn a new thread after core size is reached only
+            // if the queue.offer returns false.
+            return false;
+        }
+    }
 }
index 94639d4..d6abe16 100644 (file)
@@ -17,6 +17,7 @@
 */
 package org.opendaylight.controller.config.yang.threadpool.impl.flexible;
 
+import com.google.common.base.Optional;
 import java.util.concurrent.TimeUnit;
 
 import org.opendaylight.controller.config.api.JmxAttributeValidationException;
@@ -50,11 +51,15 @@ public final class FlexibleThreadPoolModule extends org.opendaylight.controller.
         JmxAttributeValidationException.checkNotNull(getMaxThreadCount(), maxThreadCountJmxAttribute);
         JmxAttributeValidationException.checkCondition(getMaxThreadCount() > 0, "must be greater than zero",
                 maxThreadCountJmxAttribute);
+
+        if(getQueueCapacity() != null) {
+            JmxAttributeValidationException.checkCondition(getQueueCapacity() > 0, "Queue capacity cannot be < 1", queueCapacityJmxAttribute);
+        }
     }
 
     @Override
     public java.lang.AutoCloseable createInstance() {
         return new FlexibleThreadPoolWrapper(getMinThreadCount(), getMaxThreadCount(), getKeepAliveMillis(),
-                TimeUnit.MILLISECONDS, getThreadFactoryDependency());
+                TimeUnit.MILLISECONDS, getThreadFactoryDependency(), Optional.fromNullable(getQueueCapacity()));
     }
 }
index be275ef..c124f63 100644 (file)
@@ -46,6 +46,12 @@ module threadpool-impl-flexible {
                 type uint32;
             }
 
+            leaf queueCapacity {
+                type uint16;
+                mandatory false;
+                description "Capacity of queue that holds waiting tasks";
+            }
+
             container threadFactory {
                 uses config:service-ref {
                     refine type {
index 8fedbe4..f81a332 100644 (file)
@@ -12,7 +12,7 @@
     <data xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
       <modules xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
 
-        <!-- Netconf dispatcher to be used by all netconf-connectors --> 
+        <!-- Netconf dispatcher to be used by all netconf-connectors -->
         <module>
           <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:config:netconf:client:dispatcher">prefix:netconf-client-dispatcher</type>
           <name>global-netconf-dispatcher</name>
           </timer>
         </module>
 
-        <!-- Thread factory to be used by all threadpools in netconf-connectors --> 
+        <!-- Thread factory to be used by all threadpools in netconf-connectors -->
         <module>
           <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl">prefix:threadfactory-naming</type>
           <name>global-netconf-processing-executor-threadfactory</name>
           <name-prefix xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl">remote-connector-processing-executor</name-prefix>
-        </module> 
-        <!-- Flexible threadpool for all netconf connectors, Max thread count is set to 4 --> 
+        </module>
+        <!-- flexible threadpool for all netconf connectors, Max thread count is set to 4.  -->
         <module>
           <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible">prefix:threadpool-flexible</type>
           <name>global-netconf-processing-executor</name>
           <minThreadCount xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible">1</minThreadCount>
           <max-thread-count xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible">4</max-thread-count>
           <keepAliveMillis xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible">600000</keepAliveMillis>
+
           <threadFactory xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible">
             <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool">prefix:threadfactory</type>
             <name>global-netconf-processing-executor-threadfactory</name>
           </threadFactory>
-        </module>  
+        </module>
       </modules>
 
       <services xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">