Add NettyBootstrapFactory to hold OVSDB network threads
[ovsdb.git] / library / impl / src / main / java / org / opendaylight / ovsdb / lib / impl / OvsdbConnectionService.java
index 4d21b6cf8bc5f2a6de1f58714101bdb4eda0bf11..801db220cc5288a5785223034d5a4747c31f7eef 100644 (file)
@@ -8,22 +8,18 @@
 
 package org.opendaylight.ovsdb.lib.impl;
 
+import static java.util.Objects.requireNonNull;
+
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.AdaptiveRecvByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.handler.codec.string.StringEncoder;
 import io.netty.handler.logging.LogLevel;
 import io.netty.handler.logging.LoggingHandler;
@@ -109,6 +105,8 @@ public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
     private static final Set<OvsdbConnectionListener> CONNECTION_LISTENERS = ConcurrentHashMap.newKeySet();
     private static final Map<OvsdbClient, Channel> CONNECTIONS = new ConcurrentHashMap<>();
 
+    private final NettyBootstrapFactory bootstrapFactory;
+
     private volatile boolean useSSL = false;
     private final ICertificateManager certManagerSrv;
 
@@ -120,8 +118,9 @@ public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
     private volatile int listenerPort = 6640;
 
     @Inject
-    public OvsdbConnectionService(@Reference(filter = "type=default-certificate-manager")
-            final ICertificateManager certManagerSrv) {
+    public OvsdbConnectionService(final NettyBootstrapFactory bootstrapFactory,
+            @Reference(filter = "type=default-certificate-manager") final ICertificateManager certManagerSrv) {
+        this.bootstrapFactory = requireNonNull(bootstrapFactory);
         this.certManagerSrv = certManagerSrv;
     }
 
@@ -146,34 +145,30 @@ public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
     @SuppressWarnings("checkstyle:IllegalCatch")
     public OvsdbClient connectWithSsl(final InetAddress address, final int port,
             final ICertificateManager certificateManagerSrv) {
-        try {
-            Bootstrap bootstrap = new Bootstrap();
-            bootstrap.group(new NioEventLoopGroup());
-            bootstrap.channel(NioSocketChannel.class);
-            bootstrap.option(ChannelOption.TCP_NODELAY, true);
-            bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
 
-            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
-                @Override
-                public void initChannel(final SocketChannel channel) throws Exception {
-                    if (certificateManagerSrv != null && certificateManagerSrv.getServerContext() != null) {
-                        SSLContext sslContext = certificateManagerSrv.getServerContext();
-                        /* First add ssl handler if ssl context is given */
-                        SSLEngine engine =
-                            sslContext.createSSLEngine(address.toString(), port);
-                        engine.setUseClientMode(true);
-                        channel.pipeline().addLast("ssl", new SslHandler(engine));
-                    }
-                    channel.pipeline().addLast(
+        Bootstrap bootstrap = bootstrapFactory.newClient()
+                .handler(new ChannelInitializer<SocketChannel>() {
+                    @Override
+                    public void initChannel(final SocketChannel channel) throws Exception {
+                        if (certificateManagerSrv != null && certificateManagerSrv.getServerContext() != null) {
+                            SSLContext sslContext = certificateManagerSrv.getServerContext();
+                            /* First add ssl handler if ssl context is given */
+                            SSLEngine engine =
+                                    sslContext.createSSLEngine(address.toString(), port);
+                            engine.setUseClientMode(true);
+                            channel.pipeline().addLast("ssl", new SslHandler(engine));
+                        }
+                        channel.pipeline().addLast(
                             //new LoggingHandler(LogLevel.INFO),
                             new JsonRpcDecoder(jsonRpcDecoderMaxFrameLength),
                             UTF8_ENCODER,
                             new IdleStateHandler(IDLE_READER_TIMEOUT, 0, 0),
                             new ReadTimeoutHandler(READ_TIMEOUT),
                             new ExceptionHandler(OvsdbConnectionService.this));
-                }
-            });
+                    }
+                });
 
+        try {
             ChannelFuture future = bootstrap.connect(address, port).sync();
             Channel channel = future.channel();
             return getChannelClient(channel, ConnectionType.ACTIVE, SocketConnectionType.SSL);
@@ -245,13 +240,13 @@ public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
     public synchronized boolean startOvsdbManager() {
         final int ovsdbListenerPort = this.listenerPort;
         final String ovsdbListenerIp = this.listenerIp;
-        if (!singletonCreated.getAndSet(true)) {
-            LOG.info("startOvsdbManager: Starting");
-            new Thread(() -> ovsdbManager(ovsdbListenerIp, ovsdbListenerPort)).start();
-            return true;
-        } else {
+        if (singletonCreated.getAndSet(true)) {
             return false;
         }
+
+        LOG.info("startOvsdbManager: Starting");
+        new Thread(() -> ovsdbManager(ovsdbListenerIp, ovsdbListenerPort)).start();
+        return true;
     }
 
     /**
@@ -313,54 +308,48 @@ public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void ovsdbManagerWithSsl(final String ip, final int port, final ICertificateManager certificateManagerSrv,
                                             final String[] protocols, final String[] cipherSuites) {
-        EventLoopGroup bossGroup = new NioEventLoopGroup();
-        EventLoopGroup workerGroup = new NioEventLoopGroup();
-        try {
-            ServerBootstrap serverBootstrap = new ServerBootstrap();
-            serverBootstrap.group(bossGroup, workerGroup)
-                    .channel(NioServerSocketChannel.class)
-                    .option(ChannelOption.SO_BACKLOG, 100)
-                    .handler(new LoggingHandler(LogLevel.INFO))
-                    .childHandler(new ChannelInitializer<SocketChannel>() {
-                        @Override
-                        public void initChannel(final SocketChannel channel) throws Exception {
-                            LOG.debug("New Passive channel created : {}", channel);
-                            if (certificateManagerSrv != null && certificateManagerSrv.getServerContext() != null) {
-                                /* Add SSL handler first if SSL context is provided */
-                                SSLContext sslContext = certificateManagerSrv.getServerContext();
-                                SSLEngine engine = sslContext.createSSLEngine();
-                                engine.setUseClientMode(false); // work in a server mode
-                                engine.setNeedClientAuth(true); // need client authentication
-                                if (protocols != null && protocols.length > 0) {
-                                    //Set supported protocols
-                                    engine.setEnabledProtocols(protocols);
-                                    LOG.debug("Supported ssl protocols {}",
-                                            Arrays.toString(engine.getSupportedProtocols()));
-                                    LOG.debug("Enabled ssl protocols {}",
-                                            Arrays.toString(engine.getEnabledProtocols()));
-                                }
-                                if (cipherSuites != null && cipherSuites.length > 0) {
-                                    //Set supported cipher suites
-                                    engine.setEnabledCipherSuites(cipherSuites);
-                                    LOG.debug("Enabled cipher suites {}",
-                                            Arrays.toString(engine.getEnabledCipherSuites()));
-                                }
-                                channel.pipeline().addLast("ssl", new SslHandler(engine));
+
+        ServerBootstrap serverBootstrap = bootstrapFactory.newServer()
+                .handler(new LoggingHandler(LogLevel.INFO))
+                .childHandler(new ChannelInitializer<SocketChannel>() {
+                    @Override
+                    public void initChannel(final SocketChannel channel) throws Exception {
+                        LOG.debug("New Passive channel created : {}", channel);
+                        if (certificateManagerSrv != null && certificateManagerSrv.getServerContext() != null) {
+                            /* Add SSL handler first if SSL context is provided */
+                            SSLContext sslContext = certificateManagerSrv.getServerContext();
+                            SSLEngine engine = sslContext.createSSLEngine();
+                            engine.setUseClientMode(false); // work in a server mode
+                            engine.setNeedClientAuth(true); // need client authentication
+                            if (protocols != null && protocols.length > 0) {
+                                //Set supported protocols
+                                engine.setEnabledProtocols(protocols);
+                                LOG.debug("Supported ssl protocols {}",
+                                    Arrays.toString(engine.getSupportedProtocols()));
+                                LOG.debug("Enabled ssl protocols {}",
+                                    Arrays.toString(engine.getEnabledProtocols()));
+                            }
+                            if (cipherSuites != null && cipherSuites.length > 0) {
+                                //Set supported cipher suites
+                                engine.setEnabledCipherSuites(cipherSuites);
+                                LOG.debug("Enabled cipher suites {}",
+                                    Arrays.toString(engine.getEnabledCipherSuites()));
                             }
+                            channel.pipeline().addLast("ssl", new SslHandler(engine));
+                        }
+
+                        channel.pipeline().addLast(
+                            new JsonRpcDecoder(jsonRpcDecoderMaxFrameLength),
+                            UTF8_ENCODER,
+                            new IdleStateHandler(IDLE_READER_TIMEOUT, 0, 0),
+                            new ReadTimeoutHandler(READ_TIMEOUT),
+                            new ExceptionHandler(OvsdbConnectionService.this));
 
-                            channel.pipeline().addLast(
-                                 new JsonRpcDecoder(jsonRpcDecoderMaxFrameLength),
-                                 UTF8_ENCODER,
-                                 new IdleStateHandler(IDLE_READER_TIMEOUT, 0, 0),
-                                 new ReadTimeoutHandler(READ_TIMEOUT),
-                                 new ExceptionHandler(OvsdbConnectionService.this));
+                        handleNewPassiveConnection(channel);
+                    }
+                });
 
-                            handleNewPassiveConnection(channel);
-                        }
-                    });
-            serverBootstrap.option(ChannelOption.TCP_NODELAY, true);
-            serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR,
-                    new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
+        try {
             // Start the server.
             ChannelFuture channelFuture = serverBootstrap.bind(ip, port).sync();
             Channel serverListenChannel = channelFuture.channel();
@@ -373,10 +362,6 @@ public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
             // sync() re-throws exceptions declared as Throwable, so the compiler doesn't see them
             LOG.error("Error while binding to address {}, port {}", ip, port, throwable);
             throw throwable;
-        } finally {
-            // Shut down all event loops to terminate all threads.
-            bossGroup.shutdownGracefully();
-            workerGroup.shutdownGracefully();
         }
     }