package org.opendaylight.openflowjava.protocol.impl.core;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
import io.netty.channel.WriteBufferWaterMark;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
-import io.netty.util.concurrent.GenericFutureListener;
-
-import io.netty.channel.epoll.Epoll;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.epoll.EpollServerSocketChannel;
-
import java.net.InetAddress;
import java.net.InetSocketAddress;
-
import org.opendaylight.openflowjava.protocol.api.connection.ThreadConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-
/**
* Class implementing server over TCP / TLS for handling incoming connections.
*
private int port;
private String address;
private final InetAddress startupAddress;
+ private final Runnable readyRunnable;
private EventLoopGroup workerGroup;
private EventLoopGroup bossGroup;
private final SettableFuture<Boolean> isOnlineFuture;
*
* @param port listening port of TCPHandler server
*/
- public TcpHandler(final int port) {
- this(null, port);
+ public TcpHandler(final int port, Runnable readyRunnable) {
+ this(null, port, readyRunnable);
}
/**
* @param address listening address of TCPHandler server
* @param port listening port of TCPHandler server
*/
- public TcpHandler(final InetAddress address, final int port) {
+ public TcpHandler(final InetAddress address, final int port, Runnable readyRunnable) {
this.port = port;
this.startupAddress = address;
isOnlineFuture = SettableFuture.create();
+ this.readyRunnable = readyRunnable;
}
/**
* Starts server on selected port.
*/
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
public void run() {
/*
* We generally do not perform IO-unrelated tasks, so we want to have
final ChannelFuture f;
try {
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup)
+ ServerBootstrap bootstrap = new ServerBootstrap();
+ bootstrap.group(bossGroup, workerGroup)
.channel(socketChannelClass)
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(channelInitializer)
.childOption(ChannelOption.WRITE_SPIN_COUNT, DEFAULT_WRITE_SPIN_COUNT);
if (startupAddress != null) {
- f = b.bind(startupAddress.getHostAddress(), port).sync();
+ f = bootstrap.bind(startupAddress.getHostAddress(), port).sync();
} else {
- f = b.bind(port).sync();
+ f = bootstrap.bind(port).sync();
}
} catch (InterruptedException e) {
LOG.error("Interrupted while binding port {}", port, e);
return;
+ } catch (Throwable throwable) {
+ // sync() re-throws exceptions declared as Throwable, so the compiler doesn't see them
+ LOG.error("Error while binding address {} and port {}", startupAddress, port, throwable);
+ throw throwable;
}
try {
LOG.debug("address from tcphandler: {}", address);
isOnlineFuture.set(true);
LOG.info("Switch listener started and ready to accept incoming tcp/tls connections on port: {}", port);
+
+ readyRunnable.run();
+
+ // This waits until this channel is closed, and rethrows the cause of the failure if this future failed.
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
LOG.error("Interrupted while waiting for port {} shutdown", port, e);
}
/**
- * Shuts down {@link TcpHandler}}
+ * Shuts down {@link TcpHandler}}.
*/
@Override
public ListenableFuture<Boolean> shutdown() {
final SettableFuture<Boolean> result = SettableFuture.create();
workerGroup.shutdownGracefully();
// boss will shutdown as soon, as worker is down
- bossGroup.shutdownGracefully().addListener(new GenericFutureListener<io.netty.util.concurrent.Future<Object>>() {
-
- @Override
- public void operationComplete(
- final io.netty.util.concurrent.Future<Object> downResult) throws Exception {
- result.set(downResult.isSuccess());
- if (downResult.cause() != null) {
- result.setException(downResult.cause());
- }
+ bossGroup.shutdownGracefully().addListener(downResult -> {
+ result.set(downResult.isSuccess());
+ if (downResult.cause() != null) {
+ result.setException(downResult.cause());
}
-
});
return result;
}
/**
+ * Returns the number of connected clients / channels.
*
* @return number of connected clients / channels
*/
return isOnlineFuture;
}
- /**
- * @return the port
- */
public int getPort() {
return port;
}
- /**
- * @return the address
- */
public String getAddress() {
return address;
}
- /**
- * @param channelInitializer
- */
public void setChannelInitializer(TcpChannelInitializer channelInitializer) {
this.channelInitializer = channelInitializer;
}
}
/**
- * Initiate event loop groups
+ * Initiate event loop groups.
+ *
* @param threadConfiguration number of threads to be created, if not specified in threadConfig
*/
public void initiateEventLoopGroups(ThreadConfiguration threadConfiguration, boolean isEpollEnabled) {
- if(isEpollEnabled) {
+ if (isEpollEnabled) {
initiateEpollEventLoopGroups(threadConfiguration);
} else {
initiateNioEventLoopGroups(threadConfiguration);
}
/**
- * Initiate Nio event loop groups
+ * Initiate Nio event loop groups.
+ *
* @param threadConfiguration number of threads to be created, if not specified in threadConfig
*/
public void initiateNioEventLoopGroups(ThreadConfiguration threadConfiguration) {
}
/**
- * Initiate Epoll event loop groups with Nio as fall back
- * @param threadConfiguration
+ * Initiate Epoll event loop groups with Nio as fall back.
+ *
+ * @param threadConfiguration the ThreadConfiguration
*/
+ @SuppressWarnings("checkstyle:IllegalCatch")
protected void initiateEpollEventLoopGroups(ThreadConfiguration threadConfiguration) {
try {
socketChannelClass = EpollServerSocketChannel.class;
if (threadConfiguration != null) {
- bossGroup = new EpollEventLoopGroup(threadConfiguration.getBossThreadCount());
+ bossGroup = new EpollEventLoopGroup(threadConfiguration.getBossThreadCount());
workerGroup = new EpollEventLoopGroup(threadConfiguration.getWorkerThreadCount());
} else {
bossGroup = new EpollEventLoopGroup();
}
((EpollEventLoopGroup)workerGroup).setIoRatio(100);
return;
- } catch (Throwable ex) {
+ } catch (RuntimeException ex) {
LOG.debug("Epoll initiation failed");
}
initiateNioEventLoopGroups(threadConfiguration);
}
- /**
- * @return workerGroup
- */
public EventLoopGroup getWorkerGroup() {
return workerGroup;
}
-
}