Add NettyBootstrapFactory to hold OVSDB network threads 61/86161/13
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 3 Dec 2019 15:26:59 +0000 (16:26 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Thu, 5 Dec 2019 22:32:15 +0000 (23:32 +0100)
OvsdbConnectionService is rather bad at using threads efficiently,
as each individual client creates its own EventLoopGroup, which is
not shared with anyone.

This refactors bootstrap generation, so that it is tied to a global
thread factory.

JIRA: OVSDB-411
Change-Id: Ie16e123fa3de87fd4f148c54e610515db3f1de9e
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
library/impl/pom.xml
library/impl/src/main/java/org/opendaylight/ovsdb/lib/impl/NettyBootstrapFactory.java [new file with mode: 0644]
library/impl/src/main/java/org/opendaylight/ovsdb/lib/impl/OvsdbConnectionService.java

index 7ce2058dfcce7246bc2353d50ffe085a22aa678a..970f9196c20739ac7a5b171c77bd73164a8b66cc 100644 (file)
@@ -80,6 +80,11 @@ and is available at http://www.eclipse.org/legal/epl-v10.html
       <groupId>javax.inject</groupId>
       <artifactId>javax.inject</artifactId>
     </dependency>
+    <dependency>
+      <groupId>javax.annotation</groupId>
+      <artifactId>javax.annotation-api</artifactId>
+      <optional>true</optional>
+    </dependency>
 
     <!-- Testing Dependencies -->
     <dependency>
diff --git a/library/impl/src/main/java/org/opendaylight/ovsdb/lib/impl/NettyBootstrapFactory.java b/library/impl/src/main/java/org/opendaylight/ovsdb/lib/impl/NettyBootstrapFactory.java
new file mode 100644 (file)
index 0000000..0449cf0
--- /dev/null
@@ -0,0 +1,70 @@
+/*
+ * Copyright © 2019 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.ovsdb.lib.impl;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.AdaptiveRecvByteBufAllocator;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import javax.annotation.PreDestroy;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A globally-instantiated context for use with OvsdbConnectionService.
+ */
+@Singleton
+public class NettyBootstrapFactory implements AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(NettyBootstrapFactory.class);
+
+    private final EventLoopGroup bossGroup = new NioEventLoopGroup(0,
+        new ThreadFactoryBuilder().setNameFormat("OVSDB listener-%d").build());
+    private final EventLoopGroup workerGroup = new NioEventLoopGroup(0,
+        new ThreadFactoryBuilder().setNameFormat("OVSDB connection-%d").build());
+
+    @Inject
+    public NettyBootstrapFactory() {
+        LOG.info("OVSDB global Netty context instantiated");
+    }
+
+    Bootstrap newClient() {
+        return new Bootstrap()
+                .group(workerGroup)
+                .channel(NioSocketChannel.class)
+                .option(ChannelOption.TCP_NODELAY, true)
+                .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
+    }
+
+    ServerBootstrap newServer() {
+        return new ServerBootstrap()
+                .group(bossGroup, workerGroup)
+                .channel(NioServerSocketChannel.class)
+                .option(ChannelOption.TCP_NODELAY, true)
+                .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535))
+                .option(ChannelOption.SO_BACKLOG, 100);
+    }
+
+    @PreDestroy
+    @Override
+    public void close() {
+        LOG.info("OVSDB global Netty context terminating");
+        bossGroup.shutdownGracefully().addListener(ignore -> {
+            LOG.info("OVSDB global server group terminated");
+        });
+        workerGroup.shutdownGracefully().addListener(ignore -> {
+            LOG.info("OVSDB global channel group terminated");
+        });
+    }
+}
index 77be4ca1e30dc3ed512e5ccd1732e738adf8e9d1..2b2417356ad3c0f36fd842b770c2af1a804247f0 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.ovsdb.lib.impl;
 
+import static java.util.Objects.requireNonNull;
+
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -14,16 +16,10 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.AdaptiveRecvByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.handler.codec.string.StringEncoder;
 import io.netty.handler.logging.LogLevel;
 import io.netty.handler.logging.LoggingHandler;
@@ -109,6 +105,8 @@ public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
     private static final Set<OvsdbConnectionListener> CONNECTION_LISTENERS = ConcurrentHashMap.newKeySet();
     private static final Map<OvsdbClient, Channel> CONNECTIONS = new ConcurrentHashMap<>();
 
+    private final NettyBootstrapFactory bootstrapFactory;
+
     private volatile boolean useSSL = false;
     private final ICertificateManager certManagerSrv;
 
@@ -120,8 +118,9 @@ public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
     private volatile int listenerPort = 6640;
 
     @Inject
-    public OvsdbConnectionService(@Reference(filter = "type=default-certificate-manager")
-            final ICertificateManager certManagerSrv) {
+    public OvsdbConnectionService(final NettyBootstrapFactory bootstrapFactory,
+            @Reference(filter = "type=default-certificate-manager") final ICertificateManager certManagerSrv) {
+        this.bootstrapFactory = requireNonNull(bootstrapFactory);
         this.certManagerSrv = certManagerSrv;
     }
 
@@ -146,34 +145,30 @@ public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
     @SuppressWarnings("checkstyle:IllegalCatch")
     public OvsdbClient connectWithSsl(final InetAddress address, final int port,
             final ICertificateManager certificateManagerSrv) {
-        try {
-            Bootstrap bootstrap = new Bootstrap();
-            bootstrap.group(new NioEventLoopGroup());
-            bootstrap.channel(NioSocketChannel.class);
-            bootstrap.option(ChannelOption.TCP_NODELAY, true);
-            bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
 
-            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
-                @Override
-                public void initChannel(final SocketChannel channel) throws Exception {
-                    if (certificateManagerSrv != null && certificateManagerSrv.getServerContext() != null) {
-                        SSLContext sslContext = certificateManagerSrv.getServerContext();
-                        /* First add ssl handler if ssl context is given */
-                        SSLEngine engine =
-                            sslContext.createSSLEngine(address.toString(), port);
-                        engine.setUseClientMode(true);
-                        channel.pipeline().addLast("ssl", new SslHandler(engine));
-                    }
-                    channel.pipeline().addLast(
+        Bootstrap bootstrap = bootstrapFactory.newClient()
+                .handler(new ChannelInitializer<SocketChannel>() {
+                    @Override
+                    public void initChannel(final SocketChannel channel) throws Exception {
+                        if (certificateManagerSrv != null && certificateManagerSrv.getServerContext() != null) {
+                            SSLContext sslContext = certificateManagerSrv.getServerContext();
+                            /* First add ssl handler if ssl context is given */
+                            SSLEngine engine =
+                                    sslContext.createSSLEngine(address.toString(), port);
+                            engine.setUseClientMode(true);
+                            channel.pipeline().addLast("ssl", new SslHandler(engine));
+                        }
+                        channel.pipeline().addLast(
                             //new LoggingHandler(LogLevel.INFO),
                             new JsonRpcDecoder(jsonRpcDecoderMaxFrameLength),
                             UTF8_ENCODER,
                             new IdleStateHandler(IDLE_READER_TIMEOUT, 0, 0),
                             new ReadTimeoutHandler(READ_TIMEOUT),
                             new ExceptionHandler(OvsdbConnectionService.this));
-                }
-            });
+                    }
+                });
 
+        try {
             ChannelFuture future = bootstrap.connect(address, port).sync();
             Channel channel = future.channel();
             return getChannelClient(channel, ConnectionType.ACTIVE, SocketConnectionType.SSL);
@@ -245,13 +240,13 @@ public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
     public synchronized boolean startOvsdbManager() {
         final int ovsdbListenerPort = this.listenerPort;
         final String ovsdbListenerIp = this.listenerIp;
-        if (!singletonCreated.getAndSet(true)) {
-            LOG.info("startOvsdbManager: Starting");
-            new Thread(() -> ovsdbManager(ovsdbListenerIp, ovsdbListenerPort)).start();
-            return true;
-        } else {
+        if (singletonCreated.getAndSet(true)) {
             return false;
         }
+
+        LOG.info("startOvsdbManager: Starting");
+        new Thread(() -> ovsdbManager(ovsdbListenerIp, ovsdbListenerPort)).start();
+        return true;
     }
 
     /**
@@ -313,54 +308,48 @@ public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void ovsdbManagerWithSsl(final String ip, final int port, final ICertificateManager certificateManagerSrv,
                                             final String[] protocols, final String[] cipherSuites) {
-        EventLoopGroup bossGroup = new NioEventLoopGroup();
-        EventLoopGroup workerGroup = new NioEventLoopGroup();
-        try {
-            ServerBootstrap serverBootstrap = new ServerBootstrap();
-            serverBootstrap.group(bossGroup, workerGroup)
-                    .channel(NioServerSocketChannel.class)
-                    .option(ChannelOption.SO_BACKLOG, 100)
-                    .handler(new LoggingHandler(LogLevel.INFO))
-                    .childHandler(new ChannelInitializer<SocketChannel>() {
-                        @Override
-                        public void initChannel(final SocketChannel channel) throws Exception {
-                            LOG.debug("New Passive channel created : {}", channel);
-                            if (certificateManagerSrv != null && certificateManagerSrv.getServerContext() != null) {
-                                /* Add SSL handler first if SSL context is provided */
-                                SSLContext sslContext = certificateManagerSrv.getServerContext();
-                                SSLEngine engine = sslContext.createSSLEngine();
-                                engine.setUseClientMode(false); // work in a server mode
-                                engine.setNeedClientAuth(true); // need client authentication
-                                if (protocols != null && protocols.length > 0) {
-                                    //Set supported protocols
-                                    engine.setEnabledProtocols(protocols);
-                                    LOG.debug("Supported ssl protocols {}",
-                                            Arrays.toString(engine.getSupportedProtocols()));
-                                    LOG.debug("Enabled ssl protocols {}",
-                                            Arrays.toString(engine.getEnabledProtocols()));
-                                }
-                                if (cipherSuites != null && cipherSuites.length > 0) {
-                                    //Set supported cipher suites
-                                    engine.setEnabledCipherSuites(cipherSuites);
-                                    LOG.debug("Enabled cipher suites {}",
-                                            Arrays.toString(engine.getEnabledCipherSuites()));
-                                }
-                                channel.pipeline().addLast("ssl", new SslHandler(engine));
+
+        ServerBootstrap serverBootstrap = bootstrapFactory.newServer()
+                .handler(new LoggingHandler(LogLevel.INFO))
+                .childHandler(new ChannelInitializer<SocketChannel>() {
+                    @Override
+                    public void initChannel(final SocketChannel channel) throws Exception {
+                        LOG.debug("New Passive channel created : {}", channel);
+                        if (certificateManagerSrv != null && certificateManagerSrv.getServerContext() != null) {
+                            /* Add SSL handler first if SSL context is provided */
+                            SSLContext sslContext = certificateManagerSrv.getServerContext();
+                            SSLEngine engine = sslContext.createSSLEngine();
+                            engine.setUseClientMode(false); // work in a server mode
+                            engine.setNeedClientAuth(true); // need client authentication
+                            if (protocols != null && protocols.length > 0) {
+                                //Set supported protocols
+                                engine.setEnabledProtocols(protocols);
+                                LOG.debug("Supported ssl protocols {}",
+                                    Arrays.toString(engine.getSupportedProtocols()));
+                                LOG.debug("Enabled ssl protocols {}",
+                                    Arrays.toString(engine.getEnabledProtocols()));
+                            }
+                            if (cipherSuites != null && cipherSuites.length > 0) {
+                                //Set supported cipher suites
+                                engine.setEnabledCipherSuites(cipherSuites);
+                                LOG.debug("Enabled cipher suites {}",
+                                    Arrays.toString(engine.getEnabledCipherSuites()));
                             }
+                            channel.pipeline().addLast("ssl", new SslHandler(engine));
+                        }
+
+                        channel.pipeline().addLast(
+                            new JsonRpcDecoder(jsonRpcDecoderMaxFrameLength),
+                            UTF8_ENCODER,
+                            new IdleStateHandler(IDLE_READER_TIMEOUT, 0, 0),
+                            new ReadTimeoutHandler(READ_TIMEOUT),
+                            new ExceptionHandler(OvsdbConnectionService.this));
 
-                            channel.pipeline().addLast(
-                                 new JsonRpcDecoder(jsonRpcDecoderMaxFrameLength),
-                                 UTF8_ENCODER,
-                                 new IdleStateHandler(IDLE_READER_TIMEOUT, 0, 0),
-                                 new ReadTimeoutHandler(READ_TIMEOUT),
-                                 new ExceptionHandler(OvsdbConnectionService.this));
+                        handleNewPassiveConnection(channel);
+                    }
+                });
 
-                            handleNewPassiveConnection(channel);
-                        }
-                    });
-            serverBootstrap.option(ChannelOption.TCP_NODELAY, true);
-            serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR,
-                    new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
+        try {
             // Start the server.
             ChannelFuture channelFuture = serverBootstrap.bind(ip, port).sync();
             Channel serverListenChannel = channelFuture.channel();
@@ -373,10 +362,6 @@ public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
             // sync() re-throws exceptions declared as Throwable, so the compiler doesn't see them
             LOG.error("Error while binding to address {}, port {}", ip, port, throwable);
             throw throwable;
-        } finally {
-            // Shut down all event loops to terminate all threads.
-            bossGroup.shutdownGracefully();
-            workerGroup.shutdownGracefully();
         }
     }