import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.bootstrap.Bootstrap;
-import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringEncoder;
}
LOG.info("startOvsdbManager: Starting");
- new Thread(() -> ovsdbManager(ovsdbListenerIp, ovsdbListenerPort)).start();
+ ovsdbManager(ovsdbListenerIp, ovsdbListenerPort);
return true;
}
final ICertificateManager certificateManagerSrv,
final String[] protocols, final String[] cipherSuites) {
if (!singletonCreated.getAndSet(true)) {
- new Thread(() -> ovsdbManagerWithSsl(ovsdbListenIp, ovsdbListenPort,
- certificateManagerSrv, protocols, cipherSuites)).start();
+ ovsdbManagerWithSsl(ovsdbListenIp, ovsdbListenPort, certificateManagerSrv, protocols, cipherSuites);
return true;
} else {
return false;
}
@Override
- public synchronized boolean restartOvsdbManagerWithSsl(final String ovsdbListenIp,
- final int ovsdbListenPort,
- final ICertificateManager certificateManagerSrv,
- final String[] protocols,
- final String[] cipherSuites) {
+ public synchronized boolean restartOvsdbManagerWithSsl(final String ovsdbListenIp, final int ovsdbListenPort,
+ final ICertificateManager certificateManagerSrv, final String[] protocols, final String[] cipherSuites) {
if (singletonCreated.getAndSet(false) && serverChannel != null) {
serverChannel.close();
LOG.info("Server channel closed");
*/
@SuppressWarnings("checkstyle:IllegalCatch")
private void ovsdbManagerWithSsl(final String ip, final int port, final ICertificateManager certificateManagerSrv,
- final String[] protocols, final String[] cipherSuites) {
+ final String[] protocols, final String[] cipherSuites) {
- ServerBootstrap serverBootstrap = bootstrapFactory.newServer()
- .handler(new LoggingHandler(LogLevel.INFO))
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(final SocketChannel channel) throws Exception {
- LOG.debug("New Passive channel created : {}", channel);
- if (certificateManagerSrv != null && certificateManagerSrv.getServerContext() != null) {
- /* Add SSL handler first if SSL context is provided */
- SSLContext sslContext = certificateManagerSrv.getServerContext();
+ bootstrapFactory.newServer()
+ .handler(new LoggingHandler(LogLevel.INFO))
+ .childHandler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(final SocketChannel channel) throws Exception {
+ LOG.debug("New Passive channel created : {}", channel);
+ if (certificateManagerSrv != null) {
+ /* Add SSL handler first if SSL context is provided */
+ final SSLContext sslContext = certificateManagerSrv.getServerContext();
+ if (sslContext != null) {
SSLEngine engine = sslContext.createSSLEngine();
engine.setUseClientMode(false); // work in a server mode
engine.setNeedClientAuth(true); // need client authentication
}
channel.pipeline().addLast("ssl", new SslHandler(engine));
}
-
- channel.pipeline().addLast(
- new JsonRpcDecoder(jsonRpcDecoderMaxFrameLength),
- UTF8_ENCODER,
- new IdleStateHandler(IDLE_READER_TIMEOUT, 0, 0),
- new ReadTimeoutHandler(READ_TIMEOUT),
- new ExceptionHandler(OvsdbConnectionService.this));
-
- handleNewPassiveConnection(channel);
}
- });
- try {
+ channel.pipeline().addLast(
+ new JsonRpcDecoder(jsonRpcDecoderMaxFrameLength),
+ UTF8_ENCODER,
+ new IdleStateHandler(IDLE_READER_TIMEOUT, 0, 0),
+ new ReadTimeoutHandler(READ_TIMEOUT),
+ new ExceptionHandler(OvsdbConnectionService.this));
+
+ handleNewPassiveConnection(channel);
+ }
+ })
// Start the server.
- ChannelFuture channelFuture = serverBootstrap.bind(ip, port).sync();
- Channel serverListenChannel = channelFuture.channel();
- serverChannel = serverListenChannel;
- // Wait until the server socket is closed.
- serverListenChannel.closeFuture().sync();
- } catch (InterruptedException e) {
- LOG.error("Thread interrupted", e);
- } catch (Throwable throwable) {
- // sync() re-throws exceptions declared as Throwable, so the compiler doesn't see them
- LOG.error("Error while binding to address {}, port {}", ip, port, throwable);
- throw throwable;
- }
+ .bind(ip, port)
+ // Propagate the channel when its ready
+ .addListener((ChannelFutureListener) future -> {
+ if (future.isSuccess()) {
+ serverChannel = future.channel();
+ } else {
+ LOG.error("Error while binding to address {}, port {}", ip, port, future.cause());
+ }
+ });
}
private static void handleNewPassiveConnection(final OvsdbClient client) {