Bug 930 - Add bound to queue in SingletonHandler#NOTIFICATION_EXECUTOR . 18/6618/1
authorTomas Olvecky <tolvecky@cisco.com>
Wed, 30 Apr 2014 10:36:49 +0000 (12:36 +0200)
committerTomas Olvecky <tolvecky@cisco.com>
Wed, 30 Apr 2014 10:36:49 +0000 (12:36 +0200)
Add queue capacity with hardcoded value 10. When maxPoolSize + queue capacity
is reached, caller thread will be blocked.
Mark thread as interrupted when catching InterruptedException, propagate it
to caller.
Remove code that pushed first two runnables to queue when core size was
reached - saturate whole threadpool capacity first.

Change-Id: I9c52b52eac127bea0cbb2abf7a65c0165daf8895
Signed-off-by: Tomas Olvecky <tolvecky@cisco.com>
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/SingletonHolder.java
opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/codegen/impl/SingletonHolderTest.java [new file with mode: 0644]

index ac26445bf408677b6e397c9fbb77bcf4fcaa19de..4141bba2d44ec1ec13502c984efc36e282f4f912 100644 (file)
@@ -33,6 +33,8 @@ public class SingletonHolder {
 
     public static final int CORE_NOTIFICATION_THREADS = 4;
     public static final int MAX_NOTIFICATION_THREADS = 32;
 
     public static final int CORE_NOTIFICATION_THREADS = 4;
     public static final int MAX_NOTIFICATION_THREADS = 32;
+    // block caller thread after MAX_NOTIFICATION_THREADS + MAX_NOTIFICATION_QUEUE_SIZE pending notifications
+    public static final int MAX_NOTIFICATION_QUEUE_SIZE = 10;
     public static final int NOTIFICATION_THREAD_LIFE = 15;
 
     private static ListeningExecutorService NOTIFICATION_EXECUTOR = null;
     public static final int NOTIFICATION_THREAD_LIFE = 15;
 
     private static ListeningExecutorService NOTIFICATION_EXECUTOR = null;
@@ -47,19 +49,15 @@ public class SingletonHolder {
     public static synchronized final ListeningExecutorService getDefaultNotificationExecutor() {
 
         if (NOTIFICATION_EXECUTOR == null) {
     public static synchronized final ListeningExecutorService getDefaultNotificationExecutor() {
 
         if (NOTIFICATION_EXECUTOR == null) {
-            // Overriding the queue since we need an unbounded queue
-            // and threadpoolexecutor would not create new threads if the queue is not full
-            BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
+            // Overriding the queue:
+            // ThreadPoolExecutor would not create new threads if the queue is not full, thus adding
+            // occurs in RejectedExecutionHandler.
+            // This impl saturates threadpool first, then queue. When both are full caller will get blocked.
+            BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(MAX_NOTIFICATION_QUEUE_SIZE) {
                 @Override
                 public boolean offer(Runnable r) {
                 @Override
                 public boolean offer(Runnable r) {
-                    if (size() <= 1) {
-                        // if the queue is empty (or has just 1), no need to rampup the threads
-                        return super.offer(r);
-                    } else {
-                        // if the queue is not empty, force the queue to return false.
-                        // threadpoolexecutor will spawn a new thread if the queue.offer returns false.
-                        return false;
-                    }
+                    // ThreadPoolExecutor will spawn a new thread after core size is reached only if the queue.offer returns false.
+                    return false;
                 }
             };
 
                 }
             };
 
@@ -74,7 +72,8 @@ public class SingletonHolder {
                             try {
                                 executor.getQueue().put(r);
                             } catch (InterruptedException e) {
                             try {
                                 executor.getQueue().put(r);
                             } catch (InterruptedException e) {
-                                e.printStackTrace();
+                                Thread.currentThread().interrupt();// set interrupt flag after clearing
+                                throw new IllegalStateException(e);
                             }
                         }
                     });
                             }
                         }
                     });
diff --git a/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/codegen/impl/SingletonHolderTest.java b/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/codegen/impl/SingletonHolderTest.java
new file mode 100644 (file)
index 0000000..0e4c5cc
--- /dev/null
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.binding.codegen.impl;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import java.lang.reflect.Field;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Ignore
+public class SingletonHolderTest {
+    private static final Logger logger = LoggerFactory.getLogger(SingletonHolderTest.class);
+
+    @Test
+    public void testNotificationExecutor() throws Exception {
+        ListeningExecutorService executor = SingletonHolder.getDefaultNotificationExecutor();
+        ThreadPoolExecutor tpExecutor = (ThreadPoolExecutor) setAccessible(executor.getClass().getDeclaredField("delegate")).get(executor);
+        BlockingQueue<Runnable> queue = tpExecutor.getQueue();
+
+        for (int idx = 0; idx < 100; idx++) {
+            final int idx2 = idx;
+            logger.info("Adding {}\t{}\t{}", idx, queue.size(), tpExecutor.getActiveCount());
+            executor.execute(new Runnable() {
+
+                @Override
+                public void run() {
+                    logger.info("in  {}", idx2);
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                    logger.info("out {}", idx2);
+                }
+            });
+        }
+        executor.shutdown();
+        executor.awaitTermination(10, TimeUnit.SECONDS);
+    }
+
+    private static Field setAccessible(Field field) {
+        field.setAccessible(true);
+        return field;
+    }
+}