import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+
import java.net.InetAddress;
import java.net.InetSocketAddress;
import org.opendaylight.openflowjava.protocol.api.connection.ThreadConfiguration;
-import org.opendaylight.openflowjava.protocol.impl.connection.ServerFacade;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.SettableFuture;
/**
- * Class implementing server over TCP for handling incoming connections.
+ * Class implementing server over TCP / TLS for handling incoming connections.
*
* @author michal.polkorab
*/
*/
private static final int DEFAULT_WRITE_SPIN_COUNT = 16;
- private static final Logger LOGGER = LoggerFactory.getLogger(TcpHandler.class);
+ private static final Logger LOG = LoggerFactory.getLogger(TcpHandler.class);
private int port;
private String address;
private final InetAddress startupAddress;
- private NioEventLoopGroup workerGroup;
- private NioEventLoopGroup bossGroup;
+ private EventLoopGroup workerGroup;
+ private EventLoopGroup bossGroup;
private final SettableFuture<Boolean> isOnlineFuture;
private ThreadConfiguration threadConfig;
- private PublishingChannelInitializer channelInitializer;
+ private TcpChannelInitializer channelInitializer;
+
+ private Class<? extends ServerSocketChannel> socketChannelClass;
/**
* Constructor of TCPHandler that listens on selected port.
*/
@Override
public void run() {
- if (threadConfig != null) {
- bossGroup = new NioEventLoopGroup(threadConfig.getBossThreadCount());
- workerGroup = new NioEventLoopGroup(threadConfig.getWorkerThreadCount());
- } else {
- bossGroup = new NioEventLoopGroup();
- workerGroup = new NioEventLoopGroup();
- }
-
/*
* We generally do not perform IO-unrelated tasks, so we want to have
* all outstanding tasks completed before the executing thread goes
* Any other setting means netty will measure the time it spent selecting
* and spend roughly proportional time executing tasks.
*/
- workerGroup.setIoRatio(100);
+ //workerGroup.setIoRatio(100);
final ChannelFuture f;
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
+ .channel(socketChannelClass)
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(channelInitializer)
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
+ .childOption(ChannelOption.TCP_NODELAY , true)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, DEFAULT_WRITE_HIGH_WATERMARK * 1024)
.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, DEFAULT_WRITE_LOW_WATERMARK * 1024)
f = b.bind(port).sync();
}
} catch (InterruptedException e) {
- LOGGER.error("Interrupted while binding port {}", port, e);
+ LOG.error("Interrupted while binding port {}", port, e);
return;
}
// Update port, as it may have been specified as 0
this.port = isa.getPort();
- LOGGER.debug("address from tcphandler: {}", address);
+ LOG.debug("address from tcphandler: {}", address);
isOnlineFuture.set(true);
- LOGGER.info("Switch listener started and ready to accept incoming connections on port: {}", port);
+ LOG.info("Switch listener started and ready to accept incoming tcp/tls connections on port: {}", port);
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
- LOGGER.error("Interrupted while waiting for port {} shutdown", port, e);
+ LOG.error("Interrupted while waiting for port {} shutdown", port, e);
} finally {
shutdown();
}
/**
* @param channelInitializer
*/
- public void setChannelInitializer(PublishingChannelInitializer channelInitializer) {
+ public void setChannelInitializer(TcpChannelInitializer channelInitializer) {
this.channelInitializer = channelInitializer;
}
- /**
- * @param threadConfig EventLoopGroup configuration
- */
+ @Override
public void setThreadConfig(ThreadConfiguration threadConfig) {
this.threadConfig = threadConfig;
}
+
+ /**
+ * Initiate event loop groups
+ * @param threadConfiguration number of threads to be created, if not specified in threadConfig
+ */
+ public void initiateEventLoopGroups(ThreadConfiguration threadConfiguration, boolean isEpollEnabled) {
+
+ if(isEpollEnabled) {
+ initiateEpollEventLoopGroups(threadConfiguration);
+ } else {
+ initiateNioEventLoopGroups(threadConfiguration);
+ }
+ }
+
+ /**
+ * Initiate Nio event loop groups
+ * @param threadConfiguration number of threads to be created, if not specified in threadConfig
+ */
+ public void initiateNioEventLoopGroups(ThreadConfiguration threadConfiguration) {
+ socketChannelClass = NioServerSocketChannel.class;
+ if (threadConfiguration != null) {
+ bossGroup = new NioEventLoopGroup(threadConfiguration.getBossThreadCount());
+ workerGroup = new NioEventLoopGroup(threadConfiguration.getWorkerThreadCount());
+ } else {
+ bossGroup = new NioEventLoopGroup();
+ workerGroup = new NioEventLoopGroup();
+ }
+ ((NioEventLoopGroup)workerGroup).setIoRatio(100);
+ }
+
+ /**
+ * Initiate Epoll event loop groups with Nio as fall back
+ * @param threadConfiguration
+ */
+ protected void initiateEpollEventLoopGroups(ThreadConfiguration threadConfiguration) {
+ try {
+ socketChannelClass = EpollServerSocketChannel.class;
+ if (threadConfiguration != null) {
+ bossGroup = new EpollEventLoopGroup(threadConfiguration.getBossThreadCount());
+ workerGroup = new EpollEventLoopGroup(threadConfiguration.getWorkerThreadCount());
+ } else {
+ bossGroup = new EpollEventLoopGroup();
+ workerGroup = new EpollEventLoopGroup();
+ }
+ ((EpollEventLoopGroup)workerGroup).setIoRatio(100);
+ return;
+ } catch (Throwable ex) {
+ LOG.debug("Epoll initiation failed");
+ }
+
+ //Fallback mechanism
+ initiateNioEventLoopGroups(threadConfiguration);
+ }
+
+ /**
+ * @return workerGroup
+ */
+ public EventLoopGroup getWorkerGroup() {
+ return workerGroup;
+ }
+
}