2 * Copyright (c) 2013 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowjava.protocol.impl.core;
11 import com.google.common.util.concurrent.ListenableFuture;
12 import com.google.common.util.concurrent.SettableFuture;
13 import io.netty.bootstrap.ServerBootstrap;
14 import io.netty.buffer.PooledByteBufAllocator;
15 import io.netty.channel.ChannelFuture;
16 import io.netty.channel.ChannelOption;
17 import io.netty.channel.EventLoopGroup;
18 import io.netty.channel.WriteBufferWaterMark;
19 import io.netty.channel.epoll.EpollEventLoopGroup;
20 import io.netty.channel.epoll.EpollServerSocketChannel;
21 import io.netty.channel.nio.NioEventLoopGroup;
22 import io.netty.channel.socket.ServerSocketChannel;
23 import io.netty.channel.socket.nio.NioServerSocketChannel;
24 import io.netty.handler.logging.LogLevel;
25 import io.netty.handler.logging.LoggingHandler;
26 import java.net.InetAddress;
27 import java.net.InetSocketAddress;
28 import org.opendaylight.openflowjava.protocol.api.connection.ThreadConfiguration;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
33 * Class implementing server over TCP / TLS for handling incoming connections.
35 * @author michal.polkorab
37 public class TcpHandler implements ServerFacade {
39 * High/low write watermarks
41 private static final int DEFAULT_WRITE_HIGH_WATERMARK = 64 * 1024;
42 private static final int DEFAULT_WRITE_LOW_WATERMARK = 32 * 1024;
44 * Write spin count. This tells netty to immediately retry a non-blocking
45 * write this many times before moving on to selecting.
47 private static final int DEFAULT_WRITE_SPIN_COUNT = 16;
49 private static final Logger LOG = LoggerFactory.getLogger(TcpHandler.class);
52 private String address;
53 private final InetAddress startupAddress;
54 private final Runnable readyRunnable;
55 private EventLoopGroup workerGroup;
56 private EventLoopGroup bossGroup;
57 private final SettableFuture<Boolean> isOnlineFuture = SettableFuture.create();
59 private TcpChannelInitializer channelInitializer;
61 private Class<? extends ServerSocketChannel> socketChannelClass;
64 * Constructor of TCPHandler that listens on selected port.
66 * @param port listening port of TCPHandler server
68 public TcpHandler(final int port, final Runnable readyRunnable) {
69 this(null, port, readyRunnable);
73 * Constructor of TCPHandler that listens on selected address and port.
74 * @param address listening address of TCPHandler server
75 * @param port listening port of TCPHandler server
77 public TcpHandler(final InetAddress address, final int port, final Runnable readyRunnable) {
79 startupAddress = address;
80 this.readyRunnable = readyRunnable;
84 * Starts server on selected port.
87 @SuppressWarnings("checkstyle:IllegalCatch")
90 * We generally do not perform IO-unrelated tasks, so we want to have
91 * all outstanding tasks completed before the executing thread goes
94 * Any other setting means netty will measure the time it spent selecting
95 * and spend roughly proportional time executing tasks.
97 //workerGroup.setIoRatio(100);
99 final ChannelFuture f;
101 ServerBootstrap bootstrap = new ServerBootstrap();
102 bootstrap.group(bossGroup, workerGroup)
103 .channel(socketChannelClass)
104 .handler(new LoggingHandler(LogLevel.DEBUG))
105 .childHandler(channelInitializer)
106 .option(ChannelOption.SO_BACKLOG, 128)
107 .option(ChannelOption.SO_REUSEADDR, true)
108 .childOption(ChannelOption.SO_KEEPALIVE, true)
109 .childOption(ChannelOption.TCP_NODELAY , true)
110 .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
111 .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
112 new WriteBufferWaterMark(DEFAULT_WRITE_LOW_WATERMARK, DEFAULT_WRITE_HIGH_WATERMARK))
113 .childOption(ChannelOption.WRITE_SPIN_COUNT, DEFAULT_WRITE_SPIN_COUNT);
115 if (startupAddress != null) {
116 f = bootstrap.bind(startupAddress.getHostAddress(), port).sync();
118 f = bootstrap.bind(port).sync();
120 } catch (InterruptedException e) {
121 LOG.error("Interrupted while binding port {}", port, e);
123 } catch (Throwable throwable) {
124 // sync() re-throws exceptions declared as Throwable, so the compiler doesn't see them
125 LOG.error("Error while binding address {} and port {}", startupAddress, port, throwable);
130 InetSocketAddress isa = (InetSocketAddress) f.channel().localAddress();
131 address = isa.getHostString();
133 // Update port, as it may have been specified as 0
134 port = isa.getPort();
136 LOG.debug("address from tcphandler: {}", address);
137 LOG.info("Switch listener started and ready to accept incoming tcp/tls connections on port: {}", port);
139 isOnlineFuture.set(true);
141 // This waits until this channel is closed, and rethrows the cause of the failure if this future failed.
142 f.channel().closeFuture().sync();
143 } catch (InterruptedException e) {
144 LOG.error("Interrupted while waiting for port {} shutdown", port, e);
151 * Shuts down {@link TcpHandler}}.
154 public ListenableFuture<Boolean> shutdown() {
155 final SettableFuture<Boolean> result = SettableFuture.create();
156 workerGroup.shutdownGracefully();
157 // boss will shutdown as soon, as worker is down
158 bossGroup.shutdownGracefully().addListener(downResult -> {
159 result.set(downResult.isSuccess());
160 if (downResult.cause() != null) {
161 result.setException(downResult.cause());
168 * Returns the number of connected clients / channels.
170 * @return number of connected clients / channels
172 public int getNumberOfConnections() {
173 return channelInitializer.size();
177 public ListenableFuture<Boolean> getIsOnlineFuture() {
178 return isOnlineFuture;
181 public int getPort() {
185 public String getAddress() {
189 public void setChannelInitializer(final TcpChannelInitializer channelInitializer) {
190 this.channelInitializer = channelInitializer;
194 @Deprecated(since = "0.17.2", forRemoval = true)
195 public void setThreadConfig(final ThreadConfiguration threadConfig) {
200 * Initiate event loop groups.
202 * @param threadConfiguration number of threads to be created, if not specified in threadConfig
204 public void initiateEventLoopGroups(final ThreadConfiguration threadConfiguration, final boolean isEpollEnabled) {
205 if (isEpollEnabled) {
206 initiateEpollEventLoopGroups(threadConfiguration);
208 initiateNioEventLoopGroups(threadConfiguration);
213 * Initiate Nio event loop groups.
215 * @param threadConfiguration number of threads to be created, if not specified in threadConfig
217 public void initiateNioEventLoopGroups(final ThreadConfiguration threadConfiguration) {
218 socketChannelClass = NioServerSocketChannel.class;
219 if (threadConfiguration != null) {
220 bossGroup = new NioEventLoopGroup(threadConfiguration.getBossThreadCount());
221 workerGroup = new NioEventLoopGroup(threadConfiguration.getWorkerThreadCount());
223 bossGroup = new NioEventLoopGroup();
224 workerGroup = new NioEventLoopGroup();
226 ((NioEventLoopGroup)workerGroup).setIoRatio(100);
230 * Initiate Epoll event loop groups with Nio as fall back.
232 * @param threadConfiguration the ThreadConfiguration
234 @SuppressWarnings("checkstyle:IllegalCatch")
235 protected void initiateEpollEventLoopGroups(final ThreadConfiguration threadConfiguration) {
237 socketChannelClass = EpollServerSocketChannel.class;
238 if (threadConfiguration != null) {
239 bossGroup = new EpollEventLoopGroup(threadConfiguration.getBossThreadCount());
240 workerGroup = new EpollEventLoopGroup(threadConfiguration.getWorkerThreadCount());
242 bossGroup = new EpollEventLoopGroup();
243 workerGroup = new EpollEventLoopGroup();
245 ((EpollEventLoopGroup)workerGroup).setIoRatio(100);
247 } catch (RuntimeException ex) {
248 LOG.debug("Epoll initiation failed");
252 initiateNioEventLoopGroups(threadConfiguration);
255 public EventLoopGroup getWorkerGroup() {