Add NettyBootstrapFactory to hold OVSDB network threads 58/86258/1
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 3 Dec 2019 15:26:59 +0000 (16:26 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Fri, 6 Dec 2019 10:40:39 +0000 (11:40 +0100)
OvsdbConnectionService is rather bad at using threads efficiently,
as each individual client creates its own EventLoopGroup, which is
not shared with anyone.

This refactors bootstrap generation, so that it is tied to a global
thread factory.

JIRA: OVSDB-411
Change-Id: Ie16e123fa3de87fd4f148c54e610515db3f1de9e
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit b49f959e3024f4385af914b8779c0e31a330cf99)

library/impl/pom.xml
library/impl/src/main/java/org/opendaylight/ovsdb/lib/impl/NettyBootstrapFactory.java [new file with mode: 0644]
library/impl/src/main/java/org/opendaylight/ovsdb/lib/impl/OvsdbConnectionService.java

index c6b88ba34342c0122c3df681fb77ddda33e41e26..df72e5f83ea85b4efbbd421e5bae204bb3bd819b 100644 (file)
@@ -77,6 +77,11 @@ and is available at http://www.eclipse.org/legal/epl-v10.html
       <groupId>javax.inject</groupId>
       <artifactId>javax.inject</artifactId>
     </dependency>
+    <dependency>
+      <groupId>javax.annotation</groupId>
+      <artifactId>javax.annotation-api</artifactId>
+      <optional>true</optional>
+    </dependency>
 
     <!-- Testing Dependencies -->
     <dependency>
diff --git a/library/impl/src/main/java/org/opendaylight/ovsdb/lib/impl/NettyBootstrapFactory.java b/library/impl/src/main/java/org/opendaylight/ovsdb/lib/impl/NettyBootstrapFactory.java
new file mode 100644 (file)
index 0000000..0449cf0
--- /dev/null
@@ -0,0 +1,70 @@
+/*
+ * Copyright © 2019 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.ovsdb.lib.impl;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.AdaptiveRecvByteBufAllocator;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import javax.annotation.PreDestroy;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A globally-instantiated context for use with OvsdbConnectionService.
+ */
+@Singleton
+public class NettyBootstrapFactory implements AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(NettyBootstrapFactory.class);
+
+    private final EventLoopGroup bossGroup = new NioEventLoopGroup(0,
+        new ThreadFactoryBuilder().setNameFormat("OVSDB listener-%d").build());
+    private final EventLoopGroup workerGroup = new NioEventLoopGroup(0,
+        new ThreadFactoryBuilder().setNameFormat("OVSDB connection-%d").build());
+
+    @Inject
+    public NettyBootstrapFactory() {
+        LOG.info("OVSDB global Netty context instantiated");
+    }
+
+    Bootstrap newClient() {
+        return new Bootstrap()
+                .group(workerGroup)
+                .channel(NioSocketChannel.class)
+                .option(ChannelOption.TCP_NODELAY, true)
+                .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
+    }
+
+    ServerBootstrap newServer() {
+        return new ServerBootstrap()
+                .group(bossGroup, workerGroup)
+                .channel(NioServerSocketChannel.class)
+                .option(ChannelOption.TCP_NODELAY, true)
+                .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535))
+                .option(ChannelOption.SO_BACKLOG, 100);
+    }
+
+    @PreDestroy
+    @Override
+    public void close() {
+        LOG.info("OVSDB global Netty context terminating");
+        bossGroup.shutdownGracefully().addListener(ignore -> {
+            LOG.info("OVSDB global server group terminated");
+        });
+        workerGroup.shutdownGracefully().addListener(ignore -> {
+            LOG.info("OVSDB global channel group terminated");
+        });
+    }
+}
index 4d21b6cf8bc5f2a6de1f58714101bdb4eda0bf11..801db220cc5288a5785223034d5a4747c31f7eef 100644 (file)
@@ -8,22 +8,18 @@
 
 package org.opendaylight.ovsdb.lib.impl;
 
+import static java.util.Objects.requireNonNull;
+
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.AdaptiveRecvByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.handler.codec.string.StringEncoder;
 import io.netty.handler.logging.LogLevel;
 import io.netty.handler.logging.LoggingHandler;
@@ -109,6 +105,8 @@ public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
     private static final Set<OvsdbConnectionListener> CONNECTION_LISTENERS = ConcurrentHashMap.newKeySet();
     private static final Map<OvsdbClient, Channel> CONNECTIONS = new ConcurrentHashMap<>();
 
+    private final NettyBootstrapFactory bootstrapFactory;
+
     private volatile boolean useSSL = false;
     private final ICertificateManager certManagerSrv;
 
@@ -120,8 +118,9 @@ public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
     private volatile int listenerPort = 6640;
 
     @Inject
-    public OvsdbConnectionService(@Reference(filter = "type=default-certificate-manager")
-            final ICertificateManager certManagerSrv) {
+    public OvsdbConnectionService(final NettyBootstrapFactory bootstrapFactory,
+            @Reference(filter = "type=default-certificate-manager") final ICertificateManager certManagerSrv) {
+        this.bootstrapFactory = requireNonNull(bootstrapFactory);
         this.certManagerSrv = certManagerSrv;
     }
 
@@ -146,34 +145,30 @@ public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
     @SuppressWarnings("checkstyle:IllegalCatch")
     public OvsdbClient connectWithSsl(final InetAddress address, final int port,
             final ICertificateManager certificateManagerSrv) {
-        try {
-            Bootstrap bootstrap = new Bootstrap();
-            bootstrap.group(new NioEventLoopGroup());
-            bootstrap.channel(NioSocketChannel.class);
-            bootstrap.option(ChannelOption.TCP_NODELAY, true);
-            bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
 
-            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
-                @Override
-                public void initChannel(final SocketChannel channel) throws Exception {
-                    if (certificateManagerSrv != null && certificateManagerSrv.getServerContext() != null) {
-                        SSLContext sslContext = certificateManagerSrv.getServerContext();
-                        /* First add ssl handler if ssl context is given */
-                        SSLEngine engine =
-                            sslContext.createSSLEngine(address.toString(), port);
-                        engine.setUseClientMode(true);
-                        channel.pipeline().addLast("ssl", new SslHandler(engine));
-                    }
-                    channel.pipeline().addLast(
+        Bootstrap bootstrap = bootstrapFactory.newClient()
+                .handler(new ChannelInitializer<SocketChannel>() {
+                    @Override
+                    public void initChannel(final SocketChannel channel) throws Exception {
+                        if (certificateManagerSrv != null && certificateManagerSrv.getServerContext() != null) {
+                            SSLContext sslContext = certificateManagerSrv.getServerContext();
+                            /* First add ssl handler if ssl context is given */
+                            SSLEngine engine =
+                                    sslContext.createSSLEngine(address.toString(), port);
+                            engine.setUseClientMode(true);
+                            channel.pipeline().addLast("ssl", new SslHandler(engine));
+                        }
+                        channel.pipeline().addLast(
                             //new LoggingHandler(LogLevel.INFO),
                             new JsonRpcDecoder(jsonRpcDecoderMaxFrameLength),
                             UTF8_ENCODER,
                             new IdleStateHandler(IDLE_READER_TIMEOUT, 0, 0),
                             new ReadTimeoutHandler(READ_TIMEOUT),
                             new ExceptionHandler(OvsdbConnectionService.this));
-                }
-            });
+                    }
+                });
 
+        try {
             ChannelFuture future = bootstrap.connect(address, port).sync();
             Channel channel = future.channel();
             return getChannelClient(channel, ConnectionType.ACTIVE, SocketConnectionType.SSL);
@@ -245,13 +240,13 @@ public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
     public synchronized boolean startOvsdbManager() {
         final int ovsdbListenerPort = this.listenerPort;
         final String ovsdbListenerIp = this.listenerIp;
-        if (!singletonCreated.getAndSet(true)) {
-            LOG.info("startOvsdbManager: Starting");
-            new Thread(() -> ovsdbManager(ovsdbListenerIp, ovsdbListenerPort)).start();
-            return true;
-        } else {
+        if (singletonCreated.getAndSet(true)) {
             return false;
         }
+
+        LOG.info("startOvsdbManager: Starting");
+        new Thread(() -> ovsdbManager(ovsdbListenerIp, ovsdbListenerPort)).start();
+        return true;
     }
 
     /**
@@ -313,54 +308,48 @@ public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void ovsdbManagerWithSsl(final String ip, final int port, final ICertificateManager certificateManagerSrv,
                                             final String[] protocols, final String[] cipherSuites) {
-        EventLoopGroup bossGroup = new NioEventLoopGroup();
-        EventLoopGroup workerGroup = new NioEventLoopGroup();
-        try {
-            ServerBootstrap serverBootstrap = new ServerBootstrap();
-            serverBootstrap.group(bossGroup, workerGroup)
-                    .channel(NioServerSocketChannel.class)
-                    .option(ChannelOption.SO_BACKLOG, 100)
-                    .handler(new LoggingHandler(LogLevel.INFO))
-                    .childHandler(new ChannelInitializer<SocketChannel>() {
-                        @Override
-                        public void initChannel(final SocketChannel channel) throws Exception {
-                            LOG.debug("New Passive channel created : {}", channel);
-                            if (certificateManagerSrv != null && certificateManagerSrv.getServerContext() != null) {
-                                /* Add SSL handler first if SSL context is provided */
-                                SSLContext sslContext = certificateManagerSrv.getServerContext();
-                                SSLEngine engine = sslContext.createSSLEngine();
-                                engine.setUseClientMode(false); // work in a server mode
-                                engine.setNeedClientAuth(true); // need client authentication
-                                if (protocols != null && protocols.length > 0) {
-                                    //Set supported protocols
-                                    engine.setEnabledProtocols(protocols);
-                                    LOG.debug("Supported ssl protocols {}",
-                                            Arrays.toString(engine.getSupportedProtocols()));
-                                    LOG.debug("Enabled ssl protocols {}",
-                                            Arrays.toString(engine.getEnabledProtocols()));
-                                }
-                                if (cipherSuites != null && cipherSuites.length > 0) {
-                                    //Set supported cipher suites
-                                    engine.setEnabledCipherSuites(cipherSuites);
-                                    LOG.debug("Enabled cipher suites {}",
-                                            Arrays.toString(engine.getEnabledCipherSuites()));
-                                }
-                                channel.pipeline().addLast("ssl", new SslHandler(engine));
+
+        ServerBootstrap serverBootstrap = bootstrapFactory.newServer()
+                .handler(new LoggingHandler(LogLevel.INFO))
+                .childHandler(new ChannelInitializer<SocketChannel>() {
+                    @Override
+                    public void initChannel(final SocketChannel channel) throws Exception {
+                        LOG.debug("New Passive channel created : {}", channel);
+                        if (certificateManagerSrv != null && certificateManagerSrv.getServerContext() != null) {
+                            /* Add SSL handler first if SSL context is provided */
+                            SSLContext sslContext = certificateManagerSrv.getServerContext();
+                            SSLEngine engine = sslContext.createSSLEngine();
+                            engine.setUseClientMode(false); // work in a server mode
+                            engine.setNeedClientAuth(true); // need client authentication
+                            if (protocols != null && protocols.length > 0) {
+                                //Set supported protocols
+                                engine.setEnabledProtocols(protocols);
+                                LOG.debug("Supported ssl protocols {}",
+                                    Arrays.toString(engine.getSupportedProtocols()));
+                                LOG.debug("Enabled ssl protocols {}",
+                                    Arrays.toString(engine.getEnabledProtocols()));
+                            }
+                            if (cipherSuites != null && cipherSuites.length > 0) {
+                                //Set supported cipher suites
+                                engine.setEnabledCipherSuites(cipherSuites);
+                                LOG.debug("Enabled cipher suites {}",
+                                    Arrays.toString(engine.getEnabledCipherSuites()));
                             }
+                            channel.pipeline().addLast("ssl", new SslHandler(engine));
+                        }
+
+                        channel.pipeline().addLast(
+                            new JsonRpcDecoder(jsonRpcDecoderMaxFrameLength),
+                            UTF8_ENCODER,
+                            new IdleStateHandler(IDLE_READER_TIMEOUT, 0, 0),
+                            new ReadTimeoutHandler(READ_TIMEOUT),
+                            new ExceptionHandler(OvsdbConnectionService.this));
 
-                            channel.pipeline().addLast(
-                                 new JsonRpcDecoder(jsonRpcDecoderMaxFrameLength),
-                                 UTF8_ENCODER,
-                                 new IdleStateHandler(IDLE_READER_TIMEOUT, 0, 0),
-                                 new ReadTimeoutHandler(READ_TIMEOUT),
-                                 new ExceptionHandler(OvsdbConnectionService.this));
+                        handleNewPassiveConnection(channel);
+                    }
+                });
 
-                            handleNewPassiveConnection(channel);
-                        }
-                    });
-            serverBootstrap.option(ChannelOption.TCP_NODELAY, true);
-            serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR,
-                    new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
+        try {
             // Start the server.
             ChannelFuture channelFuture = serverBootstrap.bind(ip, port).sync();
             Channel serverListenChannel = channelFuture.channel();
@@ -373,10 +362,6 @@ public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
             // sync() re-throws exceptions declared as Throwable, so the compiler doesn't see them
             LOG.error("Error while binding to address {}, port {}", ip, port, throwable);
             throw throwable;
-        } finally {
-            // Shut down all event loops to terminate all threads.
-            bossGroup.shutdownGracefully();
-            workerGroup.shutdownGracefully();
         }
     }