Prefer more threads than deeper queue 31/7631/1
authorRobert Varga <robert.varga@pantheon.sk>
Tue, 3 Jun 2014 05:56:29 +0000 (07:56 +0200)
committerRobert Varga <robert.varga@pantheon.sk>
Tue, 3 Jun 2014 08:49:51 +0000 (10:49 +0200)
ThreadPoolExecutor does not start spawning new threads until the
underlying queue's offer() method return false. This means that we would
completely fill the queue before kicking off more threads.

This patch introduces a ForwardingBlockingQueue, which always returns
false from offer(), thus forcing the threadpool to expand. The policy
then uses put() to schedule (or block on scheduling) the tasks.

Change-Id: I341f0ca7061f6b76ae1f3e049b12704f35140633
Signed-off-by: Robert Varga <robert.varga@pantheon.sk>
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MDController.java

index 8eddc86017916db6f2faac3f9f2bce6e004e2a8b..945b663fbd1e23a73a2c2eac95fe490447d441bc 100644 (file)
@@ -13,11 +13,15 @@ import java.util.Collection;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -87,6 +91,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ForwardingBlockingQueue;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -103,7 +108,7 @@ public class MDController implements IMDController, AutoCloseable {
 
     private ConcurrentMap<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> messageTranslators;
     private Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListeners;
-    private MessageSpy<DataContainer> messageSpyCounter; 
+    private MessageSpy<DataContainer> messageSpyCounter;
 
     final private int OF10 = OFConstants.OFP_VERSION_1_0;
     final private int OF13 = OFConstants.OFP_VERSION_1_3;
@@ -146,7 +151,7 @@ public class MDController implements IMDController, AutoCloseable {
         notificationPopListener.setNotificationProviderService(
                 OFSessionUtil.getSessionManager().getNotificationProviderService());
         notificationPopListener.setMessageSpy(messageSpyCounter);
-        
+
         //TODO: move registration to factory
         addMessagePopListener(NodeErrorNotification.class, notificationPopListener);
         addMessagePopListener(BadActionErrorNotification.class, notificationPopListener);
@@ -171,11 +176,11 @@ public class MDController implements IMDController, AutoCloseable {
 
         addMessagePopListener(SwitchFlowRemoved.class, notificationPopListener);
         addMessagePopListener(TableUpdated.class, notificationPopListener);
-        
+
         //Notification registration for flow statistics
         addMessagePopListener(FlowsStatisticsUpdate.class, notificationPopListener);
         addMessagePopListener(AggregateFlowStatisticsUpdate.class, notificationPopListener);
-        
+
         //Notification registrations for group-statistics
         addMessagePopListener(GroupStatisticsUpdated.class, notificationPopListener);
         addMessagePopListener(GroupFeaturesUpdated.class, notificationPopListener);
@@ -188,38 +193,64 @@ public class MDController implements IMDController, AutoCloseable {
 
         //Notification registration for port-statistics
         addMessagePopListener(NodeConnectorStatisticsUpdate.class, notificationPopListener);
-        
+
         //Notification registration for flow-table statistics
         addMessagePopListener(FlowTableStatisticsUpdate.class, notificationPopListener);
-        
+
         //Notification registration for queue-statistics
         addMessagePopListener(QueueStatisticsUpdate.class, notificationPopListener);
 
         //Notification for LLDPSpeaker
         LLDPSpeakerPopListener<DataObject> lldpPopListener  = new LLDPSpeakerPopListener<DataObject>();
         addMessagePopListener(NodeConnectorUpdated.class,lldpPopListener);
-        
+
         // Push the updated Listeners to Session Manager which will be then picked up by ConnectionConductor eventually
         OFSessionUtil.getSessionManager().setTranslatorMapping(messageTranslators);
         OFSessionUtil.getSessionManager().setPopListenerMapping(popListeners);
         OFSessionUtil.getSessionManager().setMessageSpy(messageSpyCounter);
-        
+
         // prepare worker pool for rpc
         // TODO: get size from configSubsystem
         int rpcThreadLimit = 10;
         ListeningExecutorService rpcPoolDelegator = createRpcPoolSpyDecorated(rpcThreadLimit, messageSpyCounter);
         OFSessionUtil.getSessionManager().setRpcPool(rpcPoolDelegator);
-        
+
     }
 
     /**
      * @param rpcThreadLimit
-     * @param messageSpy 
+     * @param messageSpy
      * @return
      */
-    private static ListeningExecutorService createRpcPoolSpyDecorated(int rpcThreadLimit, MessageSpy<DataContainer> messageSpy) {
-        ThreadPoolLoggingExecutor rpcPool = new ThreadPoolLoggingExecutor(rpcThreadLimit, rpcThreadLimit, 0L, 
-                TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+    private static ListeningExecutorService createRpcPoolSpyDecorated(final int rpcThreadLimit, final MessageSpy<DataContainer> messageSpy) {
+        final BlockingQueue<Runnable> delegate = new LinkedBlockingQueue<>(100000);
+        final BlockingQueue<Runnable> queue = new ForwardingBlockingQueue<Runnable>() {
+            @Override
+            protected BlockingQueue<Runnable> delegate() {
+                return delegate;
+            }
+
+            @Override
+            public boolean offer(final Runnable r) {
+                // ThreadPoolExecutor will spawn a new thread after core size is reached only
+                // if the queue.offer returns false.
+                return false;
+            }
+        };
+
+        ThreadPoolLoggingExecutor rpcPool = new ThreadPoolLoggingExecutor(rpcThreadLimit, rpcThreadLimit, 0L,
+                TimeUnit.MILLISECONDS, queue);
+        rpcPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
+                       @Override
+                       public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
+                               try {
+                                       executor.getQueue().put(r);
+                               } catch (InterruptedException e) {
+                                       throw new RejectedExecutionException("Interrupted while waiting on queue", e);
+                               }
+
+                       }
+               });
         ListeningExecutorService listeningRpcPool = MoreExecutors.listeningDecorator(rpcPool);
         RpcListeningExecutorService rpcPoolDecorated = new RpcListeningExecutorService(listeningRpcPool);
         rpcPoolDecorated.setMessageSpy(messageSpy);
@@ -230,7 +261,7 @@ public class MDController implements IMDController, AutoCloseable {
      * @param switchConnectionProviders
      *            the switchConnectionProviders to set
      */
-    public void setSwitchConnectionProviders(Collection<SwitchConnectionProvider> switchConnectionProviders) {
+    public void setSwitchConnectionProviders(final Collection<SwitchConnectionProvider> switchConnectionProviders) {
         this.switchConnectionProviders = switchConnectionProviders;
     }
 
@@ -247,7 +278,7 @@ public class MDController implements IMDController, AutoCloseable {
         switchConnectionHandler.setMessageSpy(messageSpyCounter);
 
         errorHandler = new ErrorHandlerSimpleImpl();
-        
+
         switchConnectionHandler.setErrorHandler(errorHandler);
         switchConnectionHandler.init();
 
@@ -257,7 +288,7 @@ public class MDController implements IMDController, AutoCloseable {
             ListenableFuture<Boolean> isOnlineFuture = switchConnectionPrv.startup();
             starterChain.add(isOnlineFuture);
         }
-        
+
         Future<List<Boolean>> srvStarted = Futures.allAsList(starterChain);
     }
 
@@ -305,7 +336,7 @@ public class MDController implements IMDController, AutoCloseable {
     }
 
     @Override
-    public void addMessageTranslator(Class<? extends DataObject> messageType, int version, IMDMessageTranslator<OfHeader, List<DataObject>> translator) {
+    public void addMessageTranslator(final Class<? extends DataObject> messageType, final int version, final IMDMessageTranslator<OfHeader, List<DataObject>> translator) {
         TranslatorKey tKey = new TranslatorKey(version, messageType.getName());
 
         Collection<IMDMessageTranslator<OfHeader, List<DataObject>>> existingValues = messageTranslators.get(tKey);
@@ -318,7 +349,7 @@ public class MDController implements IMDController, AutoCloseable {
     }
 
     @Override
-    public void removeMessageTranslator(Class<? extends DataObject> messageType, int version, IMDMessageTranslator<OfHeader, List<DataObject>> translator) {
+    public void removeMessageTranslator(final Class<? extends DataObject> messageType, final int version, final IMDMessageTranslator<OfHeader, List<DataObject>> translator) {
         TranslatorKey tKey = new TranslatorKey(version, messageType.getName());
         Collection<IMDMessageTranslator<OfHeader, List<DataObject>>> values = messageTranslators.get(tKey);
         if (values != null) {
@@ -331,7 +362,7 @@ public class MDController implements IMDController, AutoCloseable {
     }
 
     @Override
-    public void addMessagePopListener(Class<? extends DataObject> messageType, PopListener<DataObject> popListener) {
+    public void addMessagePopListener(final Class<? extends DataObject> messageType, final PopListener<DataObject> popListener) {
         Collection<PopListener<DataObject>> existingValues = popListeners.get(messageType);
         if (existingValues == null) {
             existingValues = new LinkedHashSet<>();
@@ -342,7 +373,7 @@ public class MDController implements IMDController, AutoCloseable {
     }
 
     @Override
-    public void removeMessagePopListener(Class<? extends DataObject> messageType, PopListener<DataObject> popListener) {
+    public void removeMessagePopListener(final Class<? extends DataObject> messageType, final PopListener<DataObject> popListener) {
         Collection<PopListener<DataObject>> values = popListeners.get(messageType);
         if (values != null) {
             values.remove(popListener);
@@ -357,10 +388,10 @@ public class MDController implements IMDController, AutoCloseable {
      * @param messageSpyCounter the messageSpyCounter to set
      */
     public void setMessageSpyCounter(
-            MessageSpy<DataContainer> messageSpyCounter) {
+            final MessageSpy<DataContainer> messageSpyCounter) {
         this.messageSpyCounter = messageSpyCounter;
     }
-    
+
     @Override
     public void close() {
         LOG.debug("close");