+ int queueSize = MAX_NOTIFICATION_QUEUE_SIZE;
+ String queueValue = System.getProperty(NOTIFICATION_QUEUE_SIZE_PROPERTY);
+ if (StringUtils.isNotBlank(queueValue)) {
+ try {
+ queueSize = Integer.parseInt(queueValue);
+ logger.trace("Queue size was set to {}", queueSize);
+ }catch(NumberFormatException e) {
+ logger.warn("Cannot parse {} as set by {}, using default {}", queueValue,
+ NOTIFICATION_QUEUE_SIZE_PROPERTY, queueSize);
+ }
+ }
+ // Overriding the queue:
+ // ThreadPoolExecutor would not create new threads if the queue is not full, thus adding
+ // occurs in RejectedExecutionHandler.
+ // This impl saturates threadpool first, then queue. When both are full caller will get blocked.
+ BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(queueSize) {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean offer(Runnable r) {
+ // ThreadPoolExecutor will spawn a new thread after core size is reached only if the queue.offer returns false.
+ return false;
+ }
+ };
+
+ ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("md-sal-binding-notification-%d").build();
+
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(CORE_NOTIFICATION_THREADS, MAX_NOTIFICATION_THREADS,
+ NOTIFICATION_THREAD_LIFE, TimeUnit.SECONDS, queue , factory,
+ new RejectedExecutionHandler() {
+ // if the max threads are met, then it will raise a rejectedExecution. We then push to the queue.
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+ try {
+ executor.getQueue().put(r);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();// set interrupt flag after clearing
+ throw new IllegalStateException(e);
+ }
+ }
+ });
+
+ NOTIFICATION_EXECUTOR = MoreExecutors.listeningDecorator(executor);