--- /dev/null
+/*
+ * Copyright © 2019 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.ovsdb.lib.impl;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.AdaptiveRecvByteBufAllocator;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import javax.annotation.PreDestroy;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A globally-instantiated context for use with OvsdbConnectionService.
+ */
+@Singleton
+public class NettyBootstrapFactory implements AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(NettyBootstrapFactory.class);
+
+ private final EventLoopGroup bossGroup = new NioEventLoopGroup(0,
+ new ThreadFactoryBuilder().setNameFormat("OVSDB listener-%d").build());
+ private final EventLoopGroup workerGroup = new NioEventLoopGroup(0,
+ new ThreadFactoryBuilder().setNameFormat("OVSDB connection-%d").build());
+
+ @Inject
+ public NettyBootstrapFactory() {
+ LOG.info("OVSDB global Netty context instantiated");
+ }
+
+ Bootstrap newClient() {
+ return new Bootstrap()
+ .group(workerGroup)
+ .channel(NioSocketChannel.class)
+ .option(ChannelOption.TCP_NODELAY, true)
+ .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
+ }
+
+ ServerBootstrap newServer() {
+ return new ServerBootstrap()
+ .group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .option(ChannelOption.TCP_NODELAY, true)
+ .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535))
+ .option(ChannelOption.SO_BACKLOG, 100);
+ }
+
+ @PreDestroy
+ @Override
+ public void close() {
+ LOG.info("OVSDB global Netty context terminating");
+ bossGroup.shutdownGracefully().addListener(ignore -> {
+ LOG.info("OVSDB global server group terminated");
+ });
+ workerGroup.shutdownGracefully().addListener(ignore -> {
+ LOG.info("OVSDB global channel group terminated");
+ });
+ }
+}
package org.opendaylight.ovsdb.lib.impl;
+import static java.util.Objects.requireNonNull;
+
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
private static final Set<OvsdbConnectionListener> CONNECTION_LISTENERS = ConcurrentHashMap.newKeySet();
private static final Map<OvsdbClient, Channel> CONNECTIONS = new ConcurrentHashMap<>();
+ private final NettyBootstrapFactory bootstrapFactory;
+
private volatile boolean useSSL = false;
private final ICertificateManager certManagerSrv;
private volatile int listenerPort = 6640;
@Inject
- public OvsdbConnectionService(@Reference(filter = "type=default-certificate-manager")
- final ICertificateManager certManagerSrv) {
+ public OvsdbConnectionService(final NettyBootstrapFactory bootstrapFactory,
+ @Reference(filter = "type=default-certificate-manager") final ICertificateManager certManagerSrv) {
+ this.bootstrapFactory = requireNonNull(bootstrapFactory);
this.certManagerSrv = certManagerSrv;
}
@SuppressWarnings("checkstyle:IllegalCatch")
public OvsdbClient connectWithSsl(final InetAddress address, final int port,
final ICertificateManager certificateManagerSrv) {
- try {
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group(new NioEventLoopGroup());
- bootstrap.channel(NioSocketChannel.class);
- bootstrap.option(ChannelOption.TCP_NODELAY, true);
- bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
- bootstrap.handler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(final SocketChannel channel) throws Exception {
- if (certificateManagerSrv != null && certificateManagerSrv.getServerContext() != null) {
- SSLContext sslContext = certificateManagerSrv.getServerContext();
- /* First add ssl handler if ssl context is given */
- SSLEngine engine =
- sslContext.createSSLEngine(address.toString(), port);
- engine.setUseClientMode(true);
- channel.pipeline().addLast("ssl", new SslHandler(engine));
- }
- channel.pipeline().addLast(
+ Bootstrap bootstrap = bootstrapFactory.newClient()
+ .handler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(final SocketChannel channel) throws Exception {
+ if (certificateManagerSrv != null && certificateManagerSrv.getServerContext() != null) {
+ SSLContext sslContext = certificateManagerSrv.getServerContext();
+ /* First add ssl handler if ssl context is given */
+ SSLEngine engine =
+ sslContext.createSSLEngine(address.toString(), port);
+ engine.setUseClientMode(true);
+ channel.pipeline().addLast("ssl", new SslHandler(engine));
+ }
+ channel.pipeline().addLast(
//new LoggingHandler(LogLevel.INFO),
new JsonRpcDecoder(jsonRpcDecoderMaxFrameLength),
UTF8_ENCODER,
new IdleStateHandler(IDLE_READER_TIMEOUT, 0, 0),
new ReadTimeoutHandler(READ_TIMEOUT),
new ExceptionHandler(OvsdbConnectionService.this));
- }
- });
+ }
+ });
+ try {
ChannelFuture future = bootstrap.connect(address, port).sync();
Channel channel = future.channel();
return getChannelClient(channel, ConnectionType.ACTIVE, SocketConnectionType.SSL);
public synchronized boolean startOvsdbManager() {
final int ovsdbListenerPort = this.listenerPort;
final String ovsdbListenerIp = this.listenerIp;
- if (!singletonCreated.getAndSet(true)) {
- LOG.info("startOvsdbManager: Starting");
- new Thread(() -> ovsdbManager(ovsdbListenerIp, ovsdbListenerPort)).start();
- return true;
- } else {
+ if (singletonCreated.getAndSet(true)) {
return false;
}
+
+ LOG.info("startOvsdbManager: Starting");
+ new Thread(() -> ovsdbManager(ovsdbListenerIp, ovsdbListenerPort)).start();
+ return true;
}
/**
@SuppressWarnings("checkstyle:IllegalCatch")
private void ovsdbManagerWithSsl(final String ip, final int port, final ICertificateManager certificateManagerSrv,
final String[] protocols, final String[] cipherSuites) {
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- ServerBootstrap serverBootstrap = new ServerBootstrap();
- serverBootstrap.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .option(ChannelOption.SO_BACKLOG, 100)
- .handler(new LoggingHandler(LogLevel.INFO))
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(final SocketChannel channel) throws Exception {
- LOG.debug("New Passive channel created : {}", channel);
- if (certificateManagerSrv != null && certificateManagerSrv.getServerContext() != null) {
- /* Add SSL handler first if SSL context is provided */
- SSLContext sslContext = certificateManagerSrv.getServerContext();
- SSLEngine engine = sslContext.createSSLEngine();
- engine.setUseClientMode(false); // work in a server mode
- engine.setNeedClientAuth(true); // need client authentication
- if (protocols != null && protocols.length > 0) {
- //Set supported protocols
- engine.setEnabledProtocols(protocols);
- LOG.debug("Supported ssl protocols {}",
- Arrays.toString(engine.getSupportedProtocols()));
- LOG.debug("Enabled ssl protocols {}",
- Arrays.toString(engine.getEnabledProtocols()));
- }
- if (cipherSuites != null && cipherSuites.length > 0) {
- //Set supported cipher suites
- engine.setEnabledCipherSuites(cipherSuites);
- LOG.debug("Enabled cipher suites {}",
- Arrays.toString(engine.getEnabledCipherSuites()));
- }
- channel.pipeline().addLast("ssl", new SslHandler(engine));
+
+ ServerBootstrap serverBootstrap = bootstrapFactory.newServer()
+ .handler(new LoggingHandler(LogLevel.INFO))
+ .childHandler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(final SocketChannel channel) throws Exception {
+ LOG.debug("New Passive channel created : {}", channel);
+ if (certificateManagerSrv != null && certificateManagerSrv.getServerContext() != null) {
+ /* Add SSL handler first if SSL context is provided */
+ SSLContext sslContext = certificateManagerSrv.getServerContext();
+ SSLEngine engine = sslContext.createSSLEngine();
+ engine.setUseClientMode(false); // work in a server mode
+ engine.setNeedClientAuth(true); // need client authentication
+ if (protocols != null && protocols.length > 0) {
+ //Set supported protocols
+ engine.setEnabledProtocols(protocols);
+ LOG.debug("Supported ssl protocols {}",
+ Arrays.toString(engine.getSupportedProtocols()));
+ LOG.debug("Enabled ssl protocols {}",
+ Arrays.toString(engine.getEnabledProtocols()));
+ }
+ if (cipherSuites != null && cipherSuites.length > 0) {
+ //Set supported cipher suites
+ engine.setEnabledCipherSuites(cipherSuites);
+ LOG.debug("Enabled cipher suites {}",
+ Arrays.toString(engine.getEnabledCipherSuites()));
}
+ channel.pipeline().addLast("ssl", new SslHandler(engine));
+ }
+
+ channel.pipeline().addLast(
+ new JsonRpcDecoder(jsonRpcDecoderMaxFrameLength),
+ UTF8_ENCODER,
+ new IdleStateHandler(IDLE_READER_TIMEOUT, 0, 0),
+ new ReadTimeoutHandler(READ_TIMEOUT),
+ new ExceptionHandler(OvsdbConnectionService.this));
- channel.pipeline().addLast(
- new JsonRpcDecoder(jsonRpcDecoderMaxFrameLength),
- UTF8_ENCODER,
- new IdleStateHandler(IDLE_READER_TIMEOUT, 0, 0),
- new ReadTimeoutHandler(READ_TIMEOUT),
- new ExceptionHandler(OvsdbConnectionService.this));
+ handleNewPassiveConnection(channel);
+ }
+ });
- handleNewPassiveConnection(channel);
- }
- });
- serverBootstrap.option(ChannelOption.TCP_NODELAY, true);
- serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR,
- new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
+ try {
// Start the server.
ChannelFuture channelFuture = serverBootstrap.bind(ip, port).sync();
Channel serverListenChannel = channelFuture.channel();
// sync() re-throws exceptions declared as Throwable, so the compiler doesn't see them
LOG.error("Error while binding to address {}, port {}", ip, port, throwable);
throw throwable;
- } finally {
- // Shut down all event loops to terminate all threads.
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
}
}