2 * Copyright (c) 2013 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowjava.protocol.impl.core;
11 import com.google.common.util.concurrent.ListenableFuture;
12 import com.google.common.util.concurrent.SettableFuture;
13 import io.netty.bootstrap.ServerBootstrap;
14 import io.netty.buffer.PooledByteBufAllocator;
15 import io.netty.channel.ChannelFuture;
16 import io.netty.channel.ChannelOption;
17 import io.netty.channel.EventLoopGroup;
18 import io.netty.channel.WriteBufferWaterMark;
19 import io.netty.channel.epoll.EpollEventLoopGroup;
20 import io.netty.channel.epoll.EpollServerSocketChannel;
21 import io.netty.channel.nio.NioEventLoopGroup;
22 import io.netty.channel.socket.ServerSocketChannel;
23 import io.netty.channel.socket.nio.NioServerSocketChannel;
24 import io.netty.handler.logging.LogLevel;
25 import io.netty.handler.logging.LoggingHandler;
26 import java.net.InetAddress;
27 import java.net.InetSocketAddress;
28 import org.opendaylight.openflowjava.protocol.api.connection.ThreadConfiguration;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
33 * Class implementing server over TCP / TLS for handling incoming connections.
35 * @author michal.polkorab
37 public class TcpHandler implements ServerFacade {
39 * High/low write watermarks
41 private static final int DEFAULT_WRITE_HIGH_WATERMARK = 64 * 1024;
42 private static final int DEFAULT_WRITE_LOW_WATERMARK = 32 * 1024;
44 * Write spin count. This tells netty to immediately retry a non-blocking
45 * write this many times before moving on to selecting.
47 private static final int DEFAULT_WRITE_SPIN_COUNT = 16;
49 private static final Logger LOG = LoggerFactory.getLogger(TcpHandler.class);
52 private String address;
53 private final InetAddress startupAddress;
54 private final Runnable readyRunnable;
55 private EventLoopGroup workerGroup;
56 private EventLoopGroup bossGroup;
57 private final SettableFuture<Boolean> isOnlineFuture = SettableFuture.create();
58 private ThreadConfiguration threadConfig;
60 private TcpChannelInitializer channelInitializer;
62 private Class<? extends ServerSocketChannel> socketChannelClass;
65 * Constructor of TCPHandler that listens on selected port.
67 * @param port listening port of TCPHandler server
69 public TcpHandler(final int port, Runnable readyRunnable) {
70 this(null, port, readyRunnable);
74 * Constructor of TCPHandler that listens on selected address and port.
75 * @param address listening address of TCPHandler server
76 * @param port listening port of TCPHandler server
78 public TcpHandler(final InetAddress address, final int port, Runnable readyRunnable) {
80 this.startupAddress = address;
81 this.readyRunnable = readyRunnable;
85 * Starts server on selected port.
88 @SuppressWarnings("checkstyle:IllegalCatch")
91 * We generally do not perform IO-unrelated tasks, so we want to have
92 * all outstanding tasks completed before the executing thread goes
95 * Any other setting means netty will measure the time it spent selecting
96 * and spend roughly proportional time executing tasks.
98 //workerGroup.setIoRatio(100);
100 final ChannelFuture f;
102 ServerBootstrap bootstrap = new ServerBootstrap();
103 bootstrap.group(bossGroup, workerGroup)
104 .channel(socketChannelClass)
105 .handler(new LoggingHandler(LogLevel.DEBUG))
106 .childHandler(channelInitializer)
107 .option(ChannelOption.SO_BACKLOG, 128)
108 .option(ChannelOption.SO_REUSEADDR, true)
109 .childOption(ChannelOption.SO_KEEPALIVE, true)
110 .childOption(ChannelOption.TCP_NODELAY , true)
111 .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
112 .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
113 new WriteBufferWaterMark(DEFAULT_WRITE_LOW_WATERMARK, DEFAULT_WRITE_HIGH_WATERMARK))
114 .childOption(ChannelOption.WRITE_SPIN_COUNT, DEFAULT_WRITE_SPIN_COUNT);
116 if (startupAddress != null) {
117 f = bootstrap.bind(startupAddress.getHostAddress(), port).sync();
119 f = bootstrap.bind(port).sync();
121 } catch (InterruptedException e) {
122 LOG.error("Interrupted while binding port {}", port, e);
124 } catch (Throwable throwable) {
125 // sync() re-throws exceptions declared as Throwable, so the compiler doesn't see them
126 LOG.error("Error while binding address {} and port {}", startupAddress, port, throwable);
131 InetSocketAddress isa = (InetSocketAddress) f.channel().localAddress();
132 address = isa.getHostString();
134 // Update port, as it may have been specified as 0
135 this.port = isa.getPort();
137 LOG.debug("address from tcphandler: {}", address);
138 LOG.info("Switch listener started and ready to accept incoming tcp/tls connections on port: {}", port);
140 isOnlineFuture.set(true);
142 // This waits until this channel is closed, and rethrows the cause of the failure if this future failed.
143 f.channel().closeFuture().sync();
144 } catch (InterruptedException e) {
145 LOG.error("Interrupted while waiting for port {} shutdown", port, e);
152 * Shuts down {@link TcpHandler}}.
155 public ListenableFuture<Boolean> shutdown() {
156 final SettableFuture<Boolean> result = SettableFuture.create();
157 workerGroup.shutdownGracefully();
158 // boss will shutdown as soon, as worker is down
159 bossGroup.shutdownGracefully().addListener(downResult -> {
160 result.set(downResult.isSuccess());
161 if (downResult.cause() != null) {
162 result.setException(downResult.cause());
169 * Returns the number of connected clients / channels.
171 * @return number of connected clients / channels
173 public int getNumberOfConnections() {
174 return channelInitializer.size();
178 public ListenableFuture<Boolean> getIsOnlineFuture() {
179 return isOnlineFuture;
182 public int getPort() {
186 public String getAddress() {
190 public void setChannelInitializer(TcpChannelInitializer channelInitializer) {
191 this.channelInitializer = channelInitializer;
195 public void setThreadConfig(ThreadConfiguration threadConfig) {
196 this.threadConfig = threadConfig;
200 * Initiate event loop groups.
202 * @param threadConfiguration number of threads to be created, if not specified in threadConfig
204 public void initiateEventLoopGroups(ThreadConfiguration threadConfiguration, boolean isEpollEnabled) {
205 if (isEpollEnabled) {
206 initiateEpollEventLoopGroups(threadConfiguration);
208 initiateNioEventLoopGroups(threadConfiguration);
213 * Initiate Nio event loop groups.
215 * @param threadConfiguration number of threads to be created, if not specified in threadConfig
217 public void initiateNioEventLoopGroups(ThreadConfiguration threadConfiguration) {
218 socketChannelClass = NioServerSocketChannel.class;
219 if (threadConfiguration != null) {
220 bossGroup = new NioEventLoopGroup(threadConfiguration.getBossThreadCount());
221 workerGroup = new NioEventLoopGroup(threadConfiguration.getWorkerThreadCount());
223 bossGroup = new NioEventLoopGroup();
224 workerGroup = new NioEventLoopGroup();
226 ((NioEventLoopGroup)workerGroup).setIoRatio(100);
230 * Initiate Epoll event loop groups with Nio as fall back.
232 * @param threadConfiguration the ThreadConfiguration
234 @SuppressWarnings("checkstyle:IllegalCatch")
235 protected void initiateEpollEventLoopGroups(ThreadConfiguration threadConfiguration) {
237 socketChannelClass = EpollServerSocketChannel.class;
238 if (threadConfiguration != null) {
239 bossGroup = new EpollEventLoopGroup(threadConfiguration.getBossThreadCount());
240 workerGroup = new EpollEventLoopGroup(threadConfiguration.getWorkerThreadCount());
242 bossGroup = new EpollEventLoopGroup();
243 workerGroup = new EpollEventLoopGroup();
245 ((EpollEventLoopGroup)workerGroup).setIoRatio(100);
247 } catch (RuntimeException ex) {
248 LOG.debug("Epoll initiation failed");
252 initiateNioEventLoopGroups(threadConfiguration);
255 public EventLoopGroup getWorkerGroup() {