Allow to configure number of threads used by Netty's EventLoopGroups
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / TcpHandler.java
index 5cd978c361d186980954a39b434e05d1a56254eb..72c5d07df0149704840c168e90d299128552185b 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.openflowjava.protocol.impl.core;
 
 import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.nio.NioEventLoopGroup;
@@ -20,10 +21,8 @@ import io.netty.util.concurrent.GenericFutureListener;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 
-import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler;
+import org.opendaylight.openflowjava.protocol.api.connection.ThreadConfiguration;
 import org.opendaylight.openflowjava.protocol.impl.connection.ServerFacade;
-import org.opendaylight.openflowjava.protocol.impl.deserialization.DeserializationFactory;
-import org.opendaylight.openflowjava.protocol.impl.serialization.SerializationFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,48 +54,9 @@ public class TcpHandler implements ServerFacade {
     private NioEventLoopGroup workerGroup;
     private NioEventLoopGroup bossGroup;
     private final SettableFuture<Boolean> isOnlineFuture;
+    private ThreadConfiguration threadConfig;
 
-    private final PublishingChannelInitializer channelInitializer;
-
-    /**
-     * Enum used for storing names of used components (in pipeline).
-     */
-    public static enum COMPONENT_NAMES {
-
-        /**
-         * Detects switch idle state
-         */
-        IDLE_HANDLER,
-        /**
-         * Detects TLS connections
-         */
-        TLS_DETECTOR,
-        /**
-         * Component for handling TLS frames
-         */
-        SSL_HANDLER,
-        /**
-         * Decodes incoming messages into message frames
-         */
-        OF_FRAME_DECODER,
-        /**
-         * Detects version of incoming OpenFlow Protocol message
-         */
-        OF_VERSION_DETECTOR,
-        /**
-         * Transforms OpenFlow Protocol byte messages into POJOs
-         */
-        OF_DECODER,
-        /**
-         * Transforms POJOs into OpenFlow Protocol byte messages
-         */
-        OF_ENCODER,
-        /**
-         * Delegates translated POJOs into MessageConsumer
-         */
-        DELEGATING_INBOUND_HANDLER,
-    }
-
+    private PublishingChannelInitializer channelInitializer;
 
     /**
      * Constructor of TCPHandler that listens on selected port.
@@ -115,7 +75,6 @@ public class TcpHandler implements ServerFacade {
     public TcpHandler(final InetAddress address, final int port) {
         this.port = port;
         this.startupAddress = address;
-        channelInitializer = new PublishingChannelInitializer();
         isOnlineFuture = SettableFuture.create();
     }
 
@@ -124,8 +83,13 @@ public class TcpHandler implements ServerFacade {
      */
     @Override
     public void run() {
-        bossGroup = new NioEventLoopGroup();
-        workerGroup = new NioEventLoopGroup();
+        if (threadConfig != null) {
+            bossGroup = new NioEventLoopGroup(threadConfig.getBossThreadCount());
+            workerGroup = new NioEventLoopGroup(threadConfig.getWorkerThreadCount());
+        } else {
+            bossGroup = new NioEventLoopGroup();
+            workerGroup = new NioEventLoopGroup();
+        }
 
         /*
          * We generally do not perform IO-unrelated tasks, so we want to have
@@ -147,6 +111,7 @@ public class TcpHandler implements ServerFacade {
                     .option(ChannelOption.SO_BACKLOG, 128)
                     .option(ChannelOption.SO_REUSEADDR, true)
                     .childOption(ChannelOption.SO_KEEPALIVE, true)
+                    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                     .childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, DEFAULT_WRITE_HIGH_WATERMARK * 1024)
                     .childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, DEFAULT_WRITE_LOW_WATERMARK * 1024)
                     .childOption(ChannelOption.WRITE_SPIN_COUNT, DEFAULT_WRITE_SPIN_COUNT);
@@ -210,13 +175,6 @@ public class TcpHandler implements ServerFacade {
         return channelInitializer.size();
     }
 
-    /**
-     * @return channelInitializer providing channels
-     */
-    public PublishingChannelInitializer getChannelInitializer() {
-        return channelInitializer;
-    }
-
     @Override
     public ListenableFuture<Boolean> getIsOnlineFuture() {
         return isOnlineFuture;
@@ -237,39 +195,16 @@ public class TcpHandler implements ServerFacade {
     }
 
     /**
-     * @param switchConnectionHandler
-     */
-    public void setSwitchConnectionHandler(
-            final SwitchConnectionHandler switchConnectionHandler) {
-        channelInitializer.setSwitchConnectionHandler(switchConnectionHandler);
-    }
-
-    /**
-     * @param switchIdleTimeout in milliseconds
-     */
-    public void setSwitchIdleTimeout(final long switchIdleTimeout) {
-        channelInitializer.setSwitchIdleTimeout(switchIdleTimeout);
-    }
-
-    /**
-     * @param tlsSupported
-     */
-    public void setEncryption(final boolean tlsSupported) {
-        channelInitializer.setEncryption(tlsSupported);
-    }
-
-    /**
-     * @param sf serialization factory
+     * @param channelInitializer
      */
-    public void setSerializationFactory(final SerializationFactory sf) {
-        channelInitializer.setSerializationFactory(sf);
+    public void setChannelInitializer(PublishingChannelInitializer channelInitializer) {
+        this.channelInitializer = channelInitializer;
     }
 
     /**
-     * @param factory
+     * @param threadConfig EventLoopGroup configuration
      */
-    public void setDeserializationFactory(final DeserializationFactory factory) {
-        channelInitializer.setDeserializationFactory(factory);
+    public void setThreadConfig(ThreadConfiguration threadConfig) {
+        this.threadConfig = threadConfig;
     }
-
 }