From: Robert Varga Date: Tue, 3 Jun 2014 05:56:29 +0000 (+0200) Subject: Prefer more threads than deeper queue X-Git-Tag: release/helium~166^2~1 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=44d0a767828e9b523d3710db91cdda4e647d73f6;p=openflowplugin.git Prefer more threads than deeper queue ThreadPoolExecutor does not start spawning new threads until the underlying queue's offer() method return false. This means that we would completely fill the queue before kicking off more threads. This patch introduces a ForwardingBlockingQueue, which always returns false from offer(), thus forcing the threadpool to expand. The policy then uses put() to schedule (or block on scheduling) the tasks. Change-Id: I341f0ca7061f6b76ae1f3e049b12704f35140633 Signed-off-by: Robert Varga --- diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MDController.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MDController.java index 8eddc86017..945b663fbd 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MDController.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MDController.java @@ -13,11 +13,15 @@ import java.util.Collection; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -87,6 +91,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ForwardingBlockingQueue; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; @@ -103,7 +108,7 @@ public class MDController implements IMDController, AutoCloseable { private ConcurrentMap>>> messageTranslators; private Map, Collection>> popListeners; - private MessageSpy messageSpyCounter; + private MessageSpy messageSpyCounter; final private int OF10 = OFConstants.OFP_VERSION_1_0; final private int OF13 = OFConstants.OFP_VERSION_1_3; @@ -146,7 +151,7 @@ public class MDController implements IMDController, AutoCloseable { notificationPopListener.setNotificationProviderService( OFSessionUtil.getSessionManager().getNotificationProviderService()); notificationPopListener.setMessageSpy(messageSpyCounter); - + //TODO: move registration to factory addMessagePopListener(NodeErrorNotification.class, notificationPopListener); addMessagePopListener(BadActionErrorNotification.class, notificationPopListener); @@ -171,11 +176,11 @@ public class MDController implements IMDController, AutoCloseable { addMessagePopListener(SwitchFlowRemoved.class, notificationPopListener); addMessagePopListener(TableUpdated.class, notificationPopListener); - + //Notification registration for flow statistics addMessagePopListener(FlowsStatisticsUpdate.class, notificationPopListener); addMessagePopListener(AggregateFlowStatisticsUpdate.class, notificationPopListener); - + //Notification registrations for group-statistics addMessagePopListener(GroupStatisticsUpdated.class, notificationPopListener); addMessagePopListener(GroupFeaturesUpdated.class, notificationPopListener); @@ -188,38 +193,64 @@ public class MDController implements IMDController, AutoCloseable { //Notification registration for port-statistics addMessagePopListener(NodeConnectorStatisticsUpdate.class, notificationPopListener); - + //Notification registration for flow-table statistics addMessagePopListener(FlowTableStatisticsUpdate.class, notificationPopListener); - + //Notification registration for queue-statistics addMessagePopListener(QueueStatisticsUpdate.class, notificationPopListener); //Notification for LLDPSpeaker LLDPSpeakerPopListener lldpPopListener = new LLDPSpeakerPopListener(); addMessagePopListener(NodeConnectorUpdated.class,lldpPopListener); - + // Push the updated Listeners to Session Manager which will be then picked up by ConnectionConductor eventually OFSessionUtil.getSessionManager().setTranslatorMapping(messageTranslators); OFSessionUtil.getSessionManager().setPopListenerMapping(popListeners); OFSessionUtil.getSessionManager().setMessageSpy(messageSpyCounter); - + // prepare worker pool for rpc // TODO: get size from configSubsystem int rpcThreadLimit = 10; ListeningExecutorService rpcPoolDelegator = createRpcPoolSpyDecorated(rpcThreadLimit, messageSpyCounter); OFSessionUtil.getSessionManager().setRpcPool(rpcPoolDelegator); - + } /** * @param rpcThreadLimit - * @param messageSpy + * @param messageSpy * @return */ - private static ListeningExecutorService createRpcPoolSpyDecorated(int rpcThreadLimit, MessageSpy messageSpy) { - ThreadPoolLoggingExecutor rpcPool = new ThreadPoolLoggingExecutor(rpcThreadLimit, rpcThreadLimit, 0L, - TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); + private static ListeningExecutorService createRpcPoolSpyDecorated(final int rpcThreadLimit, final MessageSpy messageSpy) { + final BlockingQueue delegate = new LinkedBlockingQueue<>(100000); + final BlockingQueue queue = new ForwardingBlockingQueue() { + @Override + protected BlockingQueue delegate() { + return delegate; + } + + @Override + public boolean offer(final Runnable r) { + // ThreadPoolExecutor will spawn a new thread after core size is reached only + // if the queue.offer returns false. + return false; + } + }; + + ThreadPoolLoggingExecutor rpcPool = new ThreadPoolLoggingExecutor(rpcThreadLimit, rpcThreadLimit, 0L, + TimeUnit.MILLISECONDS, queue); + rpcPool.setRejectedExecutionHandler(new RejectedExecutionHandler() { + @Override + public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) { + try { + executor.getQueue().put(r); + } catch (InterruptedException e) { + throw new RejectedExecutionException("Interrupted while waiting on queue", e); + } + + } + }); ListeningExecutorService listeningRpcPool = MoreExecutors.listeningDecorator(rpcPool); RpcListeningExecutorService rpcPoolDecorated = new RpcListeningExecutorService(listeningRpcPool); rpcPoolDecorated.setMessageSpy(messageSpy); @@ -230,7 +261,7 @@ public class MDController implements IMDController, AutoCloseable { * @param switchConnectionProviders * the switchConnectionProviders to set */ - public void setSwitchConnectionProviders(Collection switchConnectionProviders) { + public void setSwitchConnectionProviders(final Collection switchConnectionProviders) { this.switchConnectionProviders = switchConnectionProviders; } @@ -247,7 +278,7 @@ public class MDController implements IMDController, AutoCloseable { switchConnectionHandler.setMessageSpy(messageSpyCounter); errorHandler = new ErrorHandlerSimpleImpl(); - + switchConnectionHandler.setErrorHandler(errorHandler); switchConnectionHandler.init(); @@ -257,7 +288,7 @@ public class MDController implements IMDController, AutoCloseable { ListenableFuture isOnlineFuture = switchConnectionPrv.startup(); starterChain.add(isOnlineFuture); } - + Future> srvStarted = Futures.allAsList(starterChain); } @@ -305,7 +336,7 @@ public class MDController implements IMDController, AutoCloseable { } @Override - public void addMessageTranslator(Class messageType, int version, IMDMessageTranslator> translator) { + public void addMessageTranslator(final Class messageType, final int version, final IMDMessageTranslator> translator) { TranslatorKey tKey = new TranslatorKey(version, messageType.getName()); Collection>> existingValues = messageTranslators.get(tKey); @@ -318,7 +349,7 @@ public class MDController implements IMDController, AutoCloseable { } @Override - public void removeMessageTranslator(Class messageType, int version, IMDMessageTranslator> translator) { + public void removeMessageTranslator(final Class messageType, final int version, final IMDMessageTranslator> translator) { TranslatorKey tKey = new TranslatorKey(version, messageType.getName()); Collection>> values = messageTranslators.get(tKey); if (values != null) { @@ -331,7 +362,7 @@ public class MDController implements IMDController, AutoCloseable { } @Override - public void addMessagePopListener(Class messageType, PopListener popListener) { + public void addMessagePopListener(final Class messageType, final PopListener popListener) { Collection> existingValues = popListeners.get(messageType); if (existingValues == null) { existingValues = new LinkedHashSet<>(); @@ -342,7 +373,7 @@ public class MDController implements IMDController, AutoCloseable { } @Override - public void removeMessagePopListener(Class messageType, PopListener popListener) { + public void removeMessagePopListener(final Class messageType, final PopListener popListener) { Collection> values = popListeners.get(messageType); if (values != null) { values.remove(popListener); @@ -357,10 +388,10 @@ public class MDController implements IMDController, AutoCloseable { * @param messageSpyCounter the messageSpyCounter to set */ public void setMessageSpyCounter( - MessageSpy messageSpyCounter) { + final MessageSpy messageSpyCounter) { this.messageSpyCounter = messageSpyCounter; } - + @Override public void close() { LOG.debug("close");