X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflow-protocol-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowjava%2Fprotocol%2Fimpl%2Fcore%2FTcpHandler.java;h=59675b2700172c8df5bb34c7ae47ae0c198a9cce;hb=fbfd230558d9c43581f8b5d08eb51d0bbde02a4f;hp=7c2041d6a85c4ea5424afe8625b832ab0fcdf306;hpb=8dda3700d7c10753ead351f87d6e4b4699761fe4;p=openflowjava.git diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TcpHandler.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TcpHandler.java index 7c2041d6..59675b27 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TcpHandler.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/TcpHandler.java @@ -1,7 +1,15 @@ -/* Copyright (C)2013 Pantheon Technologies, s.r.o. All rights reserved. */ +/* + * Copyright (c) 2013 Pantheon Technologies s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + package org.opendaylight.openflowjava.protocol.impl.core; import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; @@ -13,6 +21,7 @@ import io.netty.util.concurrent.GenericFutureListener; import java.net.InetAddress; import java.net.InetSocketAddress; +import org.opendaylight.openflowjava.protocol.api.connection.ThreadConfiguration; import org.opendaylight.openflowjava.protocol.impl.connection.ServerFacade; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -21,77 +30,51 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; /** - * Class implementing server over TCP for handling incoming connections. + * Class implementing server over TCP / TLS for handling incoming connections. * * @author michal.polkorab */ public class TcpHandler implements ServerFacade { + /* + * High/low write watermarks, in KiB. + */ + private static final int DEFAULT_WRITE_HIGH_WATERMARK = 64; + private static final int DEFAULT_WRITE_LOW_WATERMARK = 32; + /* + * Write spin count. This tells netty to immediately retry a non-blocking + * write this many times before moving on to selecting. + */ + private static final int DEFAULT_WRITE_SPIN_COUNT = 16; + + private static final Logger LOGGER = LoggerFactory.getLogger(TcpHandler.class); private int port; private String address; - private InetAddress startupAddress; + private final InetAddress startupAddress; private NioEventLoopGroup workerGroup; private NioEventLoopGroup bossGroup; - private static final Logger LOGGER = LoggerFactory.getLogger(TcpHandler.class); - private SettableFuture isOnlineFuture; - - - private PublishingChannelInitializer channelInitializer; - - /** - * Enum used for storing names of used components (in pipeline). - */ - public static enum COMPONENT_NAMES { + private final SettableFuture isOnlineFuture; + private ThreadConfiguration threadConfig; - /** - * First component in pipeline - detecting TLS connections - */ - TLS_DETECTOR, - /** - * Component for handling TLS frames - */ - SSL_HANDLER, - /** - * Decodes incoming messages into message frames - */ - OF_FRAME_DECODER, - /** - * Detects version of incoming OpenFlow Protocol message - */ - OF_VERSION_DETECTOR, - /** - * Transforms OpenFlow Protocol byte messages into POJOs - */ - OF_DECODER, - /** - * Transforms POJOs into OpenFlow Protocol byte messages - */ - OF_ENCODER, - /** - * Delegates translated POJOs into MessageConsumer - */ - DELEGATING_INBOUND_HANDLER, - } - + private TcpChannelInitializer channelInitializer; /** * Constructor of TCPHandler that listens on selected port. * * @param port listening port of TCPHandler server */ - public TcpHandler(int port) { + public TcpHandler(final int port) { this(null, port); } - + /** * Constructor of TCPHandler that listens on selected address and port. * @param address listening address of TCPHandler server * @param port listening port of TCPHandler server */ - public TcpHandler(InetAddress address, int port) { + public TcpHandler(final InetAddress address, final int port) { this.port = port; this.startupAddress = address; - channelInitializer = new PublishingChannelInitializer(); isOnlineFuture = SettableFuture.create(); } @@ -100,9 +83,25 @@ public class TcpHandler implements ServerFacade { */ @Override public void run() { - LOGGER.info("Switch "); - bossGroup = new NioEventLoopGroup(); - workerGroup = new NioEventLoopGroup(); + if (threadConfig != null) { + bossGroup = new NioEventLoopGroup(threadConfig.getBossThreadCount()); + workerGroup = new NioEventLoopGroup(threadConfig.getWorkerThreadCount()); + } else { + bossGroup = new NioEventLoopGroup(); + workerGroup = new NioEventLoopGroup(); + } + + /* + * We generally do not perform IO-unrelated tasks, so we want to have + * all outstanding tasks completed before the executing thread goes + * back into select. + * + * Any other setting means netty will measure the time it spent selecting + * and spend roughly proportional time executing tasks. + */ + workerGroup.setIoRatio(100); + + final ChannelFuture f; try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) @@ -110,24 +109,36 @@ public class TcpHandler implements ServerFacade { .handler(new LoggingHandler(LogLevel.DEBUG)) .childHandler(channelInitializer) .option(ChannelOption.SO_BACKLOG, 128) - .childOption(ChannelOption.SO_KEEPALIVE, true); + .option(ChannelOption.SO_REUSEADDR, true) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, DEFAULT_WRITE_HIGH_WATERMARK * 1024) + .childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, DEFAULT_WRITE_LOW_WATERMARK * 1024) + .childOption(ChannelOption.WRITE_SPIN_COUNT, DEFAULT_WRITE_SPIN_COUNT); - ChannelFuture f; if (startupAddress != null) { f = b.bind(startupAddress.getHostAddress(), port).sync(); } else { f = b.bind(port).sync(); } - + } catch (InterruptedException e) { + LOGGER.error("Interrupted while binding port {}", port, e); + return; + } + + try { InetSocketAddress isa = (InetSocketAddress) f.channel().localAddress(); address = isa.getHostString(); - LOGGER.debug("address from tcphandler: " + address); - port = isa.getPort(); + + // Update port, as it may have been specified as 0 + this.port = isa.getPort(); + + LOGGER.debug("address from tcphandler: {}", address); isOnlineFuture.set(true); - LOGGER.info("Switch listener started and ready to accept incoming connections on port: " + port); + LOGGER.info("Switch listener started and ready to accept incoming tcp/tls connections on port: {}", port); f.channel().closeFuture().sync(); - } catch (InterruptedException ex) { - LOGGER.error(ex.getMessage(), ex); + } catch (InterruptedException e) { + LOGGER.error("Interrupted while waiting for port {} shutdown", port, e); } finally { shutdown(); } @@ -145,63 +156,53 @@ public class TcpHandler implements ServerFacade { @Override public void operationComplete( - io.netty.util.concurrent.Future downResult) throws Exception { + final io.netty.util.concurrent.Future downResult) throws Exception { result.set(downResult.isSuccess()); - result.setException(downResult.cause()); + if (downResult.cause() != null) { + result.setException(downResult.cause()); + } } - + }); return result; } - + /** - * + * * @return number of connected clients / channels */ public int getNumberOfConnections() { return channelInitializer.size(); } - - /** - * @return channelInitializer providing channels - */ - public PublishingChannelInitializer getChannelInitializer() { - return channelInitializer; - } - - /** - * Sets and starts TCPHandler. - * - * @param args - * @throws Exception - */ - public static void main(String[] args) throws Exception { - int port; - if (args.length > 0) { - port = Integer.parseInt(args[0]); - } else { - port = 6633; - } - new Thread(new TcpHandler(port)).start(); - } - + @Override public ListenableFuture getIsOnlineFuture() { return isOnlineFuture; } - + /** * @return the port */ public int getPort() { return port; } - + /** * @return the address */ public String getAddress() { return address; } - + + /** + * @param channelInitializer + */ + public void setChannelInitializer(TcpChannelInitializer channelInitializer) { + this.channelInitializer = channelInitializer; + } + + @Override + public void setThreadConfig(ThreadConfiguration threadConfig) { + this.threadConfig = threadConfig; + } }