2 * Copyright (c) 2013 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowjava.protocol.impl.core;
11 import com.google.common.util.concurrent.ListenableFuture;
12 import com.google.common.util.concurrent.SettableFuture;
13 import io.netty.bootstrap.ServerBootstrap;
14 import io.netty.buffer.PooledByteBufAllocator;
15 import io.netty.channel.ChannelFuture;
16 import io.netty.channel.ChannelOption;
17 import io.netty.channel.EventLoopGroup;
18 import io.netty.channel.WriteBufferWaterMark;
19 import io.netty.channel.epoll.EpollEventLoopGroup;
20 import io.netty.channel.epoll.EpollServerSocketChannel;
21 import io.netty.channel.nio.NioEventLoopGroup;
22 import io.netty.channel.socket.ServerSocketChannel;
23 import io.netty.channel.socket.nio.NioServerSocketChannel;
24 import io.netty.handler.logging.LogLevel;
25 import io.netty.handler.logging.LoggingHandler;
26 import java.net.InetAddress;
27 import java.net.InetSocketAddress;
28 import org.opendaylight.openflowjava.protocol.api.connection.ThreadConfiguration;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
33 * Class implementing server over TCP / TLS for handling incoming connections.
35 * @author michal.polkorab
37 public class TcpHandler implements ServerFacade {
39 * High/low write watermarks
41 private static final int DEFAULT_WRITE_HIGH_WATERMARK = 64 * 1024;
42 private static final int DEFAULT_WRITE_LOW_WATERMARK = 32 * 1024;
44 * Write spin count. This tells netty to immediately retry a non-blocking
45 * write this many times before moving on to selecting.
47 private static final int DEFAULT_WRITE_SPIN_COUNT = 16;
49 private static final Logger LOG = LoggerFactory.getLogger(TcpHandler.class);
52 private String address;
53 private final InetAddress startupAddress;
54 private final Runnable readyRunnable;
55 private EventLoopGroup workerGroup;
56 private EventLoopGroup bossGroup;
57 private final SettableFuture<Boolean> isOnlineFuture;
58 private ThreadConfiguration threadConfig;
60 private TcpChannelInitializer channelInitializer;
62 private Class<? extends ServerSocketChannel> socketChannelClass;
65 * Constructor of TCPHandler that listens on selected port.
67 * @param port listening port of TCPHandler server
69 public TcpHandler(final int port, Runnable readyRunnable) {
70 this(null, port, readyRunnable);
74 * Constructor of TCPHandler that listens on selected address and port.
75 * @param address listening address of TCPHandler server
76 * @param port listening port of TCPHandler server
78 public TcpHandler(final InetAddress address, final int port, Runnable readyRunnable) {
80 this.startupAddress = address;
81 isOnlineFuture = SettableFuture.create();
82 this.readyRunnable = readyRunnable;
86 * Starts server on selected port.
89 @SuppressWarnings("checkstyle:IllegalCatch")
92 * We generally do not perform IO-unrelated tasks, so we want to have
93 * all outstanding tasks completed before the executing thread goes
96 * Any other setting means netty will measure the time it spent selecting
97 * and spend roughly proportional time executing tasks.
99 //workerGroup.setIoRatio(100);
101 final ChannelFuture f;
103 ServerBootstrap bootstrap = new ServerBootstrap();
104 bootstrap.group(bossGroup, workerGroup)
105 .channel(socketChannelClass)
106 .handler(new LoggingHandler(LogLevel.DEBUG))
107 .childHandler(channelInitializer)
108 .option(ChannelOption.SO_BACKLOG, 128)
109 .option(ChannelOption.SO_REUSEADDR, true)
110 .childOption(ChannelOption.SO_KEEPALIVE, true)
111 .childOption(ChannelOption.TCP_NODELAY , true)
112 .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
113 .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
114 new WriteBufferWaterMark(DEFAULT_WRITE_LOW_WATERMARK, DEFAULT_WRITE_HIGH_WATERMARK))
115 .childOption(ChannelOption.WRITE_SPIN_COUNT, DEFAULT_WRITE_SPIN_COUNT);
117 if (startupAddress != null) {
118 f = bootstrap.bind(startupAddress.getHostAddress(), port).sync();
120 f = bootstrap.bind(port).sync();
122 } catch (InterruptedException e) {
123 LOG.error("Interrupted while binding port {}", port, e);
125 } catch (Throwable throwable) {
126 // sync() re-throws exceptions declared as Throwable, so the compiler doesn't see them
127 LOG.error("Error while binding address {} and port {}", startupAddress, port, throwable);
132 InetSocketAddress isa = (InetSocketAddress) f.channel().localAddress();
133 address = isa.getHostString();
135 // Update port, as it may have been specified as 0
136 this.port = isa.getPort();
138 LOG.debug("address from tcphandler: {}", address);
139 isOnlineFuture.set(true);
140 LOG.info("Switch listener started and ready to accept incoming tcp/tls connections on port: {}", port);
144 // This waits until this channel is closed, and rethrows the cause of the failure if this future failed.
145 f.channel().closeFuture().sync();
146 } catch (InterruptedException e) {
147 LOG.error("Interrupted while waiting for port {} shutdown", port, e);
154 * Shuts down {@link TcpHandler}}.
157 public ListenableFuture<Boolean> shutdown() {
158 final SettableFuture<Boolean> result = SettableFuture.create();
159 workerGroup.shutdownGracefully();
160 // boss will shutdown as soon, as worker is down
161 bossGroup.shutdownGracefully().addListener(downResult -> {
162 result.set(downResult.isSuccess());
163 if (downResult.cause() != null) {
164 result.setException(downResult.cause());
171 * Returns the number of connected clients / channels.
173 * @return number of connected clients / channels
175 public int getNumberOfConnections() {
176 return channelInitializer.size();
180 public ListenableFuture<Boolean> getIsOnlineFuture() {
181 return isOnlineFuture;
184 public int getPort() {
188 public String getAddress() {
192 public void setChannelInitializer(TcpChannelInitializer channelInitializer) {
193 this.channelInitializer = channelInitializer;
197 public void setThreadConfig(ThreadConfiguration threadConfig) {
198 this.threadConfig = threadConfig;
202 * Initiate event loop groups.
204 * @param threadConfiguration number of threads to be created, if not specified in threadConfig
206 public void initiateEventLoopGroups(ThreadConfiguration threadConfiguration, boolean isEpollEnabled) {
208 if (isEpollEnabled) {
209 initiateEpollEventLoopGroups(threadConfiguration);
211 initiateNioEventLoopGroups(threadConfiguration);
216 * Initiate Nio event loop groups.
218 * @param threadConfiguration number of threads to be created, if not specified in threadConfig
220 public void initiateNioEventLoopGroups(ThreadConfiguration threadConfiguration) {
221 socketChannelClass = NioServerSocketChannel.class;
222 if (threadConfiguration != null) {
223 bossGroup = new NioEventLoopGroup(threadConfiguration.getBossThreadCount());
224 workerGroup = new NioEventLoopGroup(threadConfiguration.getWorkerThreadCount());
226 bossGroup = new NioEventLoopGroup();
227 workerGroup = new NioEventLoopGroup();
229 ((NioEventLoopGroup)workerGroup).setIoRatio(100);
233 * Initiate Epoll event loop groups with Nio as fall back.
235 * @param threadConfiguration the ThreadConfiguration
237 @SuppressWarnings("checkstyle:IllegalCatch")
238 protected void initiateEpollEventLoopGroups(ThreadConfiguration threadConfiguration) {
240 socketChannelClass = EpollServerSocketChannel.class;
241 if (threadConfiguration != null) {
242 bossGroup = new EpollEventLoopGroup(threadConfiguration.getBossThreadCount());
243 workerGroup = new EpollEventLoopGroup(threadConfiguration.getWorkerThreadCount());
245 bossGroup = new EpollEventLoopGroup();
246 workerGroup = new EpollEventLoopGroup();
248 ((EpollEventLoopGroup)workerGroup).setIoRatio(100);
250 } catch (RuntimeException ex) {
251 LOG.debug("Epoll initiation failed");
255 initiateNioEventLoopGroups(threadConfiguration);
258 public EventLoopGroup getWorkerGroup() {