Add NettyBootstrapFactory to hold OVSDB network threads 53/86253/2
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 3 Dec 2019 15:26:59 +0000 (16:26 +0100)
committerAnil Belur <abelur@linuxfoundation.org>
Sun, 9 Feb 2020 06:27:45 +0000 (06:27 +0000)
OvsdbConnectionService is rather bad at using threads efficiently,
as each individual client creates its own EventLoopGroup, which is
not shared with anyone.

This refactors bootstrap generation, so that it is tied to a global
thread factory.

JIRA: OVSDB-411
Change-Id: Ie16e123fa3de87fd4f148c54e610515db3f1de9e
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit b49f959e3024f4385af914b8779c0e31a330cf99)

library/impl/pom.xml
library/impl/src/main/java/org/opendaylight/ovsdb/lib/impl/NettyBootstrapFactory.java [new file with mode: 0644]
library/impl/src/main/java/org/opendaylight/ovsdb/lib/impl/OvsdbConnectionService.java

index 9c168929f4921ff6dca8fb2000d00f1655e7a354..9ed40487e5d0a44c03efa7cb4d166b86a988322e 100644 (file)
@@ -80,6 +80,11 @@ and is available at http://www.eclipse.org/legal/epl-v10.html
       <groupId>javax.inject</groupId>
       <artifactId>javax.inject</artifactId>
     </dependency>
+    <dependency>
+      <groupId>javax.annotation</groupId>
+      <artifactId>javax.annotation-api</artifactId>
+      <optional>true</optional>
+    </dependency>
 
     <!-- Testing Dependencies -->
     <dependency>
diff --git a/library/impl/src/main/java/org/opendaylight/ovsdb/lib/impl/NettyBootstrapFactory.java b/library/impl/src/main/java/org/opendaylight/ovsdb/lib/impl/NettyBootstrapFactory.java
new file mode 100644 (file)
index 0000000..0449cf0
--- /dev/null
@@ -0,0 +1,70 @@
+/*
+ * Copyright © 2019 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.ovsdb.lib.impl;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.AdaptiveRecvByteBufAllocator;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import javax.annotation.PreDestroy;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A globally-instantiated context for use with OvsdbConnectionService.
+ */
+@Singleton
+public class NettyBootstrapFactory implements AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(NettyBootstrapFactory.class);
+
+    private final EventLoopGroup bossGroup = new NioEventLoopGroup(0,
+        new ThreadFactoryBuilder().setNameFormat("OVSDB listener-%d").build());
+    private final EventLoopGroup workerGroup = new NioEventLoopGroup(0,
+        new ThreadFactoryBuilder().setNameFormat("OVSDB connection-%d").build());
+
+    @Inject
+    public NettyBootstrapFactory() {
+        LOG.info("OVSDB global Netty context instantiated");
+    }
+
+    Bootstrap newClient() {
+        return new Bootstrap()
+                .group(workerGroup)
+                .channel(NioSocketChannel.class)
+                .option(ChannelOption.TCP_NODELAY, true)
+                .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
+    }
+
+    ServerBootstrap newServer() {
+        return new ServerBootstrap()
+                .group(bossGroup, workerGroup)
+                .channel(NioServerSocketChannel.class)
+                .option(ChannelOption.TCP_NODELAY, true)
+                .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535))
+                .option(ChannelOption.SO_BACKLOG, 100);
+    }
+
+    @PreDestroy
+    @Override
+    public void close() {
+        LOG.info("OVSDB global Netty context terminating");
+        bossGroup.shutdownGracefully().addListener(ignore -> {
+            LOG.info("OVSDB global server group terminated");
+        });
+        workerGroup.shutdownGracefully().addListener(ignore -> {
+            LOG.info("OVSDB global channel group terminated");
+        });
+    }
+}
index 384d9290a19ba2bc02120a03985881d0f1fdde19..763d7eeda1131d5ae5d2161c985d7c04ee9d693c 100644 (file)
@@ -8,22 +8,18 @@
 
 package org.opendaylight.ovsdb.lib.impl;
 
+import static java.util.Objects.requireNonNull;
+
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.AdaptiveRecvByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.handler.codec.string.StringEncoder;
 import io.netty.handler.logging.LogLevel;
 import io.netty.handler.logging.LoggingHandler;
@@ -109,6 +105,8 @@ public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
     private static final Set<OvsdbConnectionListener> CONNECTION_LISTENERS = ConcurrentHashMap.newKeySet();
     private static final Map<OvsdbClient, Channel> CONNECTIONS = new ConcurrentHashMap<>();
 
+    private final NettyBootstrapFactory bootstrapFactory;
+
     private volatile boolean useSSL = false;
     private final ICertificateManager certManagerSrv;
 
@@ -120,8 +118,9 @@ public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
     private volatile int listenerPort = 6640;
 
     @Inject
-    public OvsdbConnectionService(@Reference(filter = "type=default-certificate-manager")
-            final ICertificateManager certManagerSrv) {
+    public OvsdbConnectionService(final NettyBootstrapFactory bootstrapFactory,
+            @Reference(filter = "type=default-certificate-manager") final ICertificateManager certManagerSrv) {
+        this.bootstrapFactory = requireNonNull(bootstrapFactory);
         this.certManagerSrv = certManagerSrv;
     }
 
@@ -146,34 +145,30 @@ public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
     @SuppressWarnings("checkstyle:IllegalCatch")
     public OvsdbClient connectWithSsl(final InetAddress address, final int port,
             final ICertificateManager certificateManagerSrv) {
-        try {
-            Bootstrap bootstrap = new Bootstrap();
-            bootstrap.group(new NioEventLoopGroup());
-            bootstrap.channel(NioSocketChannel.class);
-            bootstrap.option(ChannelOption.TCP_NODELAY, true);
-            bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
 
-            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
-                @Override
-                public void initChannel(final SocketChannel channel) throws Exception {
-                    if (certificateManagerSrv != null && certificateManagerSrv.getServerContext() != null) {
-                        SSLContext sslContext = certificateManagerSrv.getServerContext();
-                        /* First add ssl handler if ssl context is given */
-                        SSLEngine engine =
-                            sslContext.createSSLEngine(address.toString(), port);
-                        engine.setUseClientMode(true);
-                        channel.pipeline().addLast("ssl", new SslHandler(engine));
-                    }
-                    channel.pipeline().addLast(
+        Bootstrap bootstrap = bootstrapFactory.newClient()
+                .handler(new ChannelInitializer<SocketChannel>() {
+                    @Override
+                    public void initChannel(final SocketChannel channel) throws Exception {
+                        if (certificateManagerSrv != null && certificateManagerSrv.getServerContext() != null) {
+                            SSLContext sslContext = certificateManagerSrv.getServerContext();
+                            /* First add ssl handler if ssl context is given */
+                            SSLEngine engine =
+                                    sslContext.createSSLEngine(address.toString(), port);
+                            engine.setUseClientMode(true);
+                            channel.pipeline().addLast("ssl", new SslHandler(engine));
+                        }
+                        channel.pipeline().addLast(
                             //new LoggingHandler(LogLevel.INFO),
                             new JsonRpcDecoder(jsonRpcDecoderMaxFrameLength),
                             UTF8_ENCODER,
                             new IdleStateHandler(IDLE_READER_TIMEOUT, 0, 0),
                             new ReadTimeoutHandler(READ_TIMEOUT),
                             new ExceptionHandler(OvsdbConnectionService.this));
-                }
-            });
+                    }
+                });
 
+        try {
             ChannelFuture future = bootstrap.connect(address, port).sync();
             Channel channel = future.channel();
             return getChannelClient(channel, ConnectionType.ACTIVE, SocketConnectionType.SSL);
@@ -245,13 +240,13 @@ public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
     public synchronized boolean startOvsdbManager() {
         final int ovsdbListenerPort = this.listenerPort;
         final String ovsdbListenerIp = this.listenerIp;
-        if (!singletonCreated.getAndSet(true)) {
-            LOG.info("startOvsdbManager: Starting");
-            new Thread(() -> ovsdbManager(ovsdbListenerIp, ovsdbListenerPort)).start();
-            return true;
-        } else {
+        if (singletonCreated.getAndSet(true)) {
             return false;
         }
+
+        LOG.info("startOvsdbManager: Starting");
+        new Thread(() -> ovsdbManager(ovsdbListenerIp, ovsdbListenerPort)).start();
+        return true;
     }
 
     /**
@@ -313,54 +308,48 @@ public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void ovsdbManagerWithSsl(final String ip, final int port, final ICertificateManager certificateManagerSrv,
                                             final String[] protocols, final String[] cipherSuites) {
-        EventLoopGroup bossGroup = new NioEventLoopGroup();
-        EventLoopGroup workerGroup = new NioEventLoopGroup();
-        try {
-            ServerBootstrap serverBootstrap = new ServerBootstrap();
-            serverBootstrap.group(bossGroup, workerGroup)
-                    .channel(NioServerSocketChannel.class)
-                    .option(ChannelOption.SO_BACKLOG, 100)
-                    .handler(new LoggingHandler(LogLevel.INFO))
-                    .childHandler(new ChannelInitializer<SocketChannel>() {
-                        @Override
-                        public void initChannel(final SocketChannel channel) throws Exception {
-                            LOG.debug("New Passive channel created : {}", channel);
-                            if (certificateManagerSrv != null && certificateManagerSrv.getServerContext() != null) {
-                                /* Add SSL handler first if SSL context is provided */
-                                SSLContext sslContext = certificateManagerSrv.getServerContext();
-                                SSLEngine engine = sslContext.createSSLEngine();
-                                engine.setUseClientMode(false); // work in a server mode
-                                engine.setNeedClientAuth(true); // need client authentication
-                                if (protocols != null && protocols.length > 0) {
-                                    //Set supported protocols
-                                    engine.setEnabledProtocols(protocols);
-                                    LOG.debug("Supported ssl protocols {}",
-                                            Arrays.toString(engine.getSupportedProtocols()));
-                                    LOG.debug("Enabled ssl protocols {}",
-                                            Arrays.toString(engine.getEnabledProtocols()));
-                                }
-                                if (cipherSuites != null && cipherSuites.length > 0) {
-                                    //Set supported cipher suites
-                                    engine.setEnabledCipherSuites(cipherSuites);
-                                    LOG.debug("Enabled cipher suites {}",
-                                            Arrays.toString(engine.getEnabledCipherSuites()));
-                                }
-                                channel.pipeline().addLast("ssl", new SslHandler(engine));
+
+        ServerBootstrap serverBootstrap = bootstrapFactory.newServer()
+                .handler(new LoggingHandler(LogLevel.INFO))
+                .childHandler(new ChannelInitializer<SocketChannel>() {
+                    @Override
+                    public void initChannel(final SocketChannel channel) throws Exception {
+                        LOG.debug("New Passive channel created : {}", channel);
+                        if (certificateManagerSrv != null && certificateManagerSrv.getServerContext() != null) {
+                            /* Add SSL handler first if SSL context is provided */
+                            SSLContext sslContext = certificateManagerSrv.getServerContext();
+                            SSLEngine engine = sslContext.createSSLEngine();
+                            engine.setUseClientMode(false); // work in a server mode
+                            engine.setNeedClientAuth(true); // need client authentication
+                            if (protocols != null && protocols.length > 0) {
+                                //Set supported protocols
+                                engine.setEnabledProtocols(protocols);
+                                LOG.debug("Supported ssl protocols {}",
+                                    Arrays.toString(engine.getSupportedProtocols()));
+                                LOG.debug("Enabled ssl protocols {}",
+                                    Arrays.toString(engine.getEnabledProtocols()));
+                            }
+                            if (cipherSuites != null && cipherSuites.length > 0) {
+                                //Set supported cipher suites
+                                engine.setEnabledCipherSuites(cipherSuites);
+                                LOG.debug("Enabled cipher suites {}",
+                                    Arrays.toString(engine.getEnabledCipherSuites()));
                             }
+                            channel.pipeline().addLast("ssl", new SslHandler(engine));
+                        }
+
+                        channel.pipeline().addLast(
+                            new JsonRpcDecoder(jsonRpcDecoderMaxFrameLength),
+                            UTF8_ENCODER,
+                            new IdleStateHandler(IDLE_READER_TIMEOUT, 0, 0),
+                            new ReadTimeoutHandler(READ_TIMEOUT),
+                            new ExceptionHandler(OvsdbConnectionService.this));
 
-                            channel.pipeline().addLast(
-                                 new JsonRpcDecoder(jsonRpcDecoderMaxFrameLength),
-                                 UTF8_ENCODER,
-                                 new IdleStateHandler(IDLE_READER_TIMEOUT, 0, 0),
-                                 new ReadTimeoutHandler(READ_TIMEOUT),
-                                 new ExceptionHandler(OvsdbConnectionService.this));
+                        handleNewPassiveConnection(channel);
+                    }
+                });
 
-                            handleNewPassiveConnection(channel);
-                        }
-                    });
-            serverBootstrap.option(ChannelOption.TCP_NODELAY, true);
-            serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR,
-                    new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
+        try {
             // Start the server.
             ChannelFuture channelFuture = serverBootstrap.bind(ip, port).sync();
             Channel serverListenChannel = channelFuture.channel();
@@ -373,10 +362,6 @@ public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
             // sync() re-throws exceptions declared as Throwable, so the compiler doesn't see them
             LOG.error("Error while binding to address {}, port {}", ip, port, throwable);
             throw throwable;
-        } finally {
-            // Shut down all event loops to terminate all threads.
-            bossGroup.shutdownGracefully();
-            workerGroup.shutdownGracefully();
         }
     }