*/
package org.opendaylight.controller.sal.binding.codegen.impl;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import javassist.ClassPool;
+import org.opendaylight.controller.sal.binding.codegen.RuntimeCodeGenerator;
+import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory;
+
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import javassist.ClassPool;
-
-import org.opendaylight.controller.sal.binding.codegen.RuntimeCodeGenerator;
-import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory;
-
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
public class SingletonHolder {
- public static final ClassPool CLASS_POOL = new ClassPool();
+ public static final ClassPool CLASS_POOL = ClassPool.getDefault();
public static final org.opendaylight.controller.sal.binding.codegen.impl.RuntimeCodeGenerator RPC_GENERATOR_IMPL = new org.opendaylight.controller.sal.binding.codegen.impl.RuntimeCodeGenerator(
CLASS_POOL);
public static final RuntimeCodeGenerator RPC_GENERATOR = RPC_GENERATOR_IMPL;
*/
@Deprecated
public static synchronized final ListeningExecutorService getDefaultNotificationExecutor() {
+
if (NOTIFICATION_EXECUTOR == null) {
+ // Overriding the queue since we need an unbounded queue
+ // and threadpoolexecutor would not create new threads if the queue is not full
+ BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
+ @Override
+ public boolean offer(Runnable r) {
+ if (size() <= 1) {
+ // if the queue is empty (or has just 1), no need to rampup the threads
+ return super.offer(r);
+ } else {
+ // if the queue is not empty, force the queue to return false.
+ // threadpoolexecutor will spawn a new thread if the queue.offer returns false.
+ return false;
+ }
+ }
+ };
+
ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("md-sal-binding-notification-%d").build();
- ExecutorService executor = new ThreadPoolExecutor(CORE_NOTIFICATION_THREADS, MAX_NOTIFICATION_THREADS,
- NOTIFICATION_THREAD_LIFE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), factory);
+
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(CORE_NOTIFICATION_THREADS, MAX_NOTIFICATION_THREADS,
+ NOTIFICATION_THREAD_LIFE, TimeUnit.SECONDS, queue , factory,
+ new RejectedExecutionHandler() {
+ // if the max threads are met, then it will raise a rejectedExecution. We then push to the queue.
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+ try {
+ executor.getQueue().put(r);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
NOTIFICATION_EXECUTOR = MoreExecutors.listeningDecorator(executor);
}
+
return NOTIFICATION_EXECUTOR;
}