*/
package org.opendaylight.controller.sal.binding.codegen.impl;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import javassist.ClassPool;
-import org.opendaylight.controller.sal.binding.codegen.RuntimeCodeGenerator;
-import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory;
-
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import javassist.ClassPool;
+
+import org.apache.commons.lang3.StringUtils;
+import org.opendaylight.controller.sal.binding.codegen.RuntimeCodeGenerator;
+import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
public class SingletonHolder {
+ private static final Logger logger = LoggerFactory.getLogger(SingletonHolder.class);
public static final ClassPool CLASS_POOL = ClassPool.getDefault();
public static final org.opendaylight.controller.sal.binding.codegen.impl.RuntimeCodeGenerator RPC_GENERATOR_IMPL = new org.opendaylight.controller.sal.binding.codegen.impl.RuntimeCodeGenerator(
public static final int CORE_NOTIFICATION_THREADS = 4;
public static final int MAX_NOTIFICATION_THREADS = 32;
+ // block caller thread after MAX_NOTIFICATION_THREADS + MAX_NOTIFICATION_QUEUE_SIZE pending notifications
+ public static final int MAX_NOTIFICATION_QUEUE_SIZE = 1000;
public static final int NOTIFICATION_THREAD_LIFE = 15;
+ private static final String NOTIFICATION_QUEUE_SIZE_PROPERTY = "mdsal.notificationqueue.size";
private static ListeningExecutorService NOTIFICATION_EXECUTOR = null;
private static ListeningExecutorService COMMIT_EXECUTOR = null;
* should use service injection to make the executor configurable.
*/
@Deprecated
- public static synchronized final ListeningExecutorService getDefaultNotificationExecutor() {
+ public static synchronized ListeningExecutorService getDefaultNotificationExecutor() {
if (NOTIFICATION_EXECUTOR == null) {
- // Overriding the queue since we need an unbounded queue
- // and threadpoolexecutor would not create new threads if the queue is not full
- BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
+ int queueSize = MAX_NOTIFICATION_QUEUE_SIZE;
+ String queueValue = System.getProperty(NOTIFICATION_QUEUE_SIZE_PROPERTY);
+ if (StringUtils.isNotBlank(queueValue)) {
+ try {
+ queueSize = Integer.parseInt(queueValue);
+ logger.trace("Queue size was set to {}", queueSize);
+ }catch(NumberFormatException e) {
+ logger.warn("Cannot parse {} as set by {}, using default {}", queueValue,
+ NOTIFICATION_QUEUE_SIZE_PROPERTY, queueSize);
+ }
+ }
+ // Overriding the queue:
+ // ThreadPoolExecutor would not create new threads if the queue is not full, thus adding
+ // occurs in RejectedExecutionHandler.
+ // This impl saturates threadpool first, then queue. When both are full caller will get blocked.
+ BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(queueSize) {
+ private static final long serialVersionUID = 1L;
+
@Override
public boolean offer(Runnable r) {
- if (size() <= 1) {
- // if the queue is empty (or has just 1), no need to rampup the threads
- return super.offer(r);
- } else {
- // if the queue is not empty, force the queue to return false.
- // threadpoolexecutor will spawn a new thread if the queue.offer returns false.
- return false;
- }
+ // ThreadPoolExecutor will spawn a new thread after core size is reached only if the queue.offer returns false.
+ return false;
}
};
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
- e.printStackTrace();
+ Thread.currentThread().interrupt();// set interrupt flag after clearing
+ throw new IllegalStateException(e);
}
}
});
* should use service injection to make the executor configurable.
*/
@Deprecated
- public static synchronized final ListeningExecutorService getDefaultCommitExecutor() {
+ public static synchronized ListeningExecutorService getDefaultCommitExecutor() {
if (COMMIT_EXECUTOR == null) {
ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("md-sal-binding-commit-%d").build();
/*