From: Tomas Olvecky Date: Wed, 30 Apr 2014 10:36:49 +0000 (+0200) Subject: Bug 930 - Add bound to queue in SingletonHandler#NOTIFICATION_EXECUTOR . X-Git-Tag: autorelease-tag-v20140601202136_82eb3f9~153^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=8e654401ca264098b4bcfb25fe1411caed10ebe2 Bug 930 - Add bound to queue in SingletonHandler#NOTIFICATION_EXECUTOR . Add queue capacity with hardcoded value 10. When maxPoolSize + queue capacity is reached, caller thread will be blocked. Mark thread as interrupted when catching InterruptedException, propagate it to caller. Remove code that pushed first two runnables to queue when core size was reached - saturate whole threadpool capacity first. Change-Id: I9c52b52eac127bea0cbb2abf7a65c0165daf8895 Signed-off-by: Tomas Olvecky --- diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/SingletonHolder.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/SingletonHolder.java index ac26445bf4..4141bba2d4 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/SingletonHolder.java +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/SingletonHolder.java @@ -33,6 +33,8 @@ public class SingletonHolder { public static final int CORE_NOTIFICATION_THREADS = 4; public static final int MAX_NOTIFICATION_THREADS = 32; + // block caller thread after MAX_NOTIFICATION_THREADS + MAX_NOTIFICATION_QUEUE_SIZE pending notifications + public static final int MAX_NOTIFICATION_QUEUE_SIZE = 10; public static final int NOTIFICATION_THREAD_LIFE = 15; private static ListeningExecutorService NOTIFICATION_EXECUTOR = null; @@ -47,19 +49,15 @@ public class SingletonHolder { public static synchronized final ListeningExecutorService getDefaultNotificationExecutor() { if (NOTIFICATION_EXECUTOR == null) { - // Overriding the queue since we need an unbounded queue - // and threadpoolexecutor would not create new threads if the queue is not full - BlockingQueue queue = new LinkedBlockingQueue() { + // Overriding the queue: + // ThreadPoolExecutor would not create new threads if the queue is not full, thus adding + // occurs in RejectedExecutionHandler. + // This impl saturates threadpool first, then queue. When both are full caller will get blocked. + BlockingQueue queue = new LinkedBlockingQueue(MAX_NOTIFICATION_QUEUE_SIZE) { @Override public boolean offer(Runnable r) { - if (size() <= 1) { - // if the queue is empty (or has just 1), no need to rampup the threads - return super.offer(r); - } else { - // if the queue is not empty, force the queue to return false. - // threadpoolexecutor will spawn a new thread if the queue.offer returns false. - return false; - } + // ThreadPoolExecutor will spawn a new thread after core size is reached only if the queue.offer returns false. + return false; } }; @@ -74,7 +72,8 @@ public class SingletonHolder { try { executor.getQueue().put(r); } catch (InterruptedException e) { - e.printStackTrace(); + Thread.currentThread().interrupt();// set interrupt flag after clearing + throw new IllegalStateException(e); } } }); diff --git a/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/codegen/impl/SingletonHolderTest.java b/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/codegen/impl/SingletonHolderTest.java new file mode 100644 index 0000000000..0e4c5ccb84 --- /dev/null +++ b/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/codegen/impl/SingletonHolderTest.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.sal.binding.codegen.impl; + +import com.google.common.util.concurrent.ListeningExecutorService; +import java.lang.reflect.Field; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Ignore +public class SingletonHolderTest { + private static final Logger logger = LoggerFactory.getLogger(SingletonHolderTest.class); + + @Test + public void testNotificationExecutor() throws Exception { + ListeningExecutorService executor = SingletonHolder.getDefaultNotificationExecutor(); + ThreadPoolExecutor tpExecutor = (ThreadPoolExecutor) setAccessible(executor.getClass().getDeclaredField("delegate")).get(executor); + BlockingQueue queue = tpExecutor.getQueue(); + + for (int idx = 0; idx < 100; idx++) { + final int idx2 = idx; + logger.info("Adding {}\t{}\t{}", idx, queue.size(), tpExecutor.getActiveCount()); + executor.execute(new Runnable() { + + @Override + public void run() { + logger.info("in {}", idx2); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + logger.info("out {}", idx2); + } + }); + } + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + } + + private static Field setAccessible(Field field) { + field.setAccessible(true); + return field; + } +}