Fixed netty & checkstyle failures
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / TcpHandler.java
index 72c5d07df0149704840c168e90d299128552185b..2391423103a6974864a51062c45a8b6878dd98e3 100644 (file)
@@ -13,16 +13,21 @@ import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.ServerSocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.logging.LogLevel;
 import io.netty.handler.logging.LoggingHandler;
 import io.netty.util.concurrent.GenericFutureListener;
 
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 
 import org.opendaylight.openflowjava.protocol.api.connection.ThreadConfiguration;
-import org.opendaylight.openflowjava.protocol.impl.connection.ServerFacade;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,7 +35,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 
 /**
- * Class implementing server over TCP for handling incoming connections.
+ * Class implementing server over TCP / TLS for handling incoming connections.
  *
  * @author michal.polkorab
  */
@@ -46,17 +51,19 @@ public class TcpHandler implements ServerFacade {
      */
     private static final int DEFAULT_WRITE_SPIN_COUNT = 16;
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(TcpHandler.class);
+    private static final Logger LOG = LoggerFactory.getLogger(TcpHandler.class);
 
     private int port;
     private String address;
     private final InetAddress startupAddress;
-    private NioEventLoopGroup workerGroup;
-    private NioEventLoopGroup bossGroup;
+    private EventLoopGroup workerGroup;
+    private EventLoopGroup bossGroup;
     private final SettableFuture<Boolean> isOnlineFuture;
     private ThreadConfiguration threadConfig;
 
-    private PublishingChannelInitializer channelInitializer;
+    private TcpChannelInitializer channelInitializer;
+
+    private Class<? extends ServerSocketChannel> socketChannelClass;
 
     /**
      * Constructor of TCPHandler that listens on selected port.
@@ -83,14 +90,6 @@ public class TcpHandler implements ServerFacade {
      */
     @Override
     public void run() {
-        if (threadConfig != null) {
-            bossGroup = new NioEventLoopGroup(threadConfig.getBossThreadCount());
-            workerGroup = new NioEventLoopGroup(threadConfig.getWorkerThreadCount());
-        } else {
-            bossGroup = new NioEventLoopGroup();
-            workerGroup = new NioEventLoopGroup();
-        }
-
         /*
          * We generally do not perform IO-unrelated tasks, so we want to have
          * all outstanding tasks completed before the executing thread goes
@@ -99,18 +98,19 @@ public class TcpHandler implements ServerFacade {
          * Any other setting means netty will measure the time it spent selecting
          * and spend roughly proportional time executing tasks.
          */
-        workerGroup.setIoRatio(100);
+        //workerGroup.setIoRatio(100);
 
         final ChannelFuture f;
         try {
             ServerBootstrap b = new ServerBootstrap();
             b.group(bossGroup, workerGroup)
-                    .channel(NioServerSocketChannel.class)
+                    .channel(socketChannelClass)
                     .handler(new LoggingHandler(LogLevel.DEBUG))
                     .childHandler(channelInitializer)
                     .option(ChannelOption.SO_BACKLOG, 128)
                     .option(ChannelOption.SO_REUSEADDR, true)
                     .childOption(ChannelOption.SO_KEEPALIVE, true)
+                    .childOption(ChannelOption.TCP_NODELAY , true)
                     .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                     .childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, DEFAULT_WRITE_HIGH_WATERMARK * 1024)
                     .childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, DEFAULT_WRITE_LOW_WATERMARK * 1024)
@@ -122,7 +122,7 @@ public class TcpHandler implements ServerFacade {
                 f = b.bind(port).sync();
             }
         } catch (InterruptedException e) {
-            LOGGER.error("Interrupted while binding port {}", port, e);
+            LOG.error("Interrupted while binding port {}", port, e);
             return;
         }
 
@@ -133,12 +133,12 @@ public class TcpHandler implements ServerFacade {
             // Update port, as it may have been specified as 0
             this.port = isa.getPort();
 
-            LOGGER.debug("address from tcphandler: {}", address);
+            LOG.debug("address from tcphandler: {}", address);
             isOnlineFuture.set(true);
-            LOGGER.info("Switch listener started and ready to accept incoming connections on port: {}", port);
+            LOG.info("Switch listener started and ready to accept incoming tcp/tls connections on port: {}", port);
             f.channel().closeFuture().sync();
         } catch (InterruptedException e) {
-            LOGGER.error("Interrupted while waiting for port {} shutdown", port, e);
+            LOG.error("Interrupted while waiting for port {} shutdown", port, e);
         } finally {
             shutdown();
         }
@@ -197,14 +197,73 @@ public class TcpHandler implements ServerFacade {
     /**
      * @param channelInitializer
      */
-    public void setChannelInitializer(PublishingChannelInitializer channelInitializer) {
+    public void setChannelInitializer(TcpChannelInitializer channelInitializer) {
         this.channelInitializer = channelInitializer;
     }
 
-    /**
-     * @param threadConfig EventLoopGroup configuration
-     */
+    @Override
     public void setThreadConfig(ThreadConfiguration threadConfig) {
         this.threadConfig = threadConfig;
     }
+
+    /**
+     * Initiate event loop groups
+     * @param threadConfiguration number of threads to be created, if not specified in threadConfig
+     */
+    public void initiateEventLoopGroups(ThreadConfiguration threadConfiguration, boolean isEpollEnabled) {
+
+        if(isEpollEnabled) {
+            initiateEpollEventLoopGroups(threadConfiguration);
+        } else {
+            initiateNioEventLoopGroups(threadConfiguration);
+        }
+    }
+
+    /**
+     * Initiate Nio event loop groups
+     * @param threadConfiguration number of threads to be created, if not specified in threadConfig
+     */
+    public void initiateNioEventLoopGroups(ThreadConfiguration threadConfiguration) {
+        socketChannelClass = NioServerSocketChannel.class;
+        if (threadConfiguration != null) {
+            bossGroup = new NioEventLoopGroup(threadConfiguration.getBossThreadCount());
+            workerGroup = new NioEventLoopGroup(threadConfiguration.getWorkerThreadCount());
+        } else {
+            bossGroup = new NioEventLoopGroup();
+            workerGroup = new NioEventLoopGroup();
+        }
+        ((NioEventLoopGroup)workerGroup).setIoRatio(100);
+    }
+
+    /**
+     * Initiate Epoll event loop groups with Nio as fall back
+     * @param threadConfiguration
+     */
+    protected void initiateEpollEventLoopGroups(ThreadConfiguration threadConfiguration) {
+        try {
+            socketChannelClass = EpollServerSocketChannel.class;
+            if (threadConfiguration != null) {
+                    bossGroup = new EpollEventLoopGroup(threadConfiguration.getBossThreadCount());
+                workerGroup = new EpollEventLoopGroup(threadConfiguration.getWorkerThreadCount());
+            } else {
+                bossGroup = new EpollEventLoopGroup();
+                workerGroup = new EpollEventLoopGroup();
+            }
+            ((EpollEventLoopGroup)workerGroup).setIoRatio(100);
+            return;
+        } catch (Throwable ex) {
+            LOG.debug("Epoll initiation failed");
+        }
+
+        //Fallback mechanism
+        initiateNioEventLoopGroups(threadConfiguration);
+    }
+
+    /**
+     * @return workerGroup
+     */
+    public EventLoopGroup getWorkerGroup() {
+        return workerGroup;
+    }
+
 }