import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
@Singleton
@Service(classes = OvsdbConnection.class)
public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
+ private class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
+ @Override
+ public void initChannel(final SocketChannel channel) throws Exception {
+ channel.pipeline().addLast(
+ //new LoggingHandler(LogLevel.INFO),
+ new JsonRpcDecoder(jsonRpcDecoderMaxFrameLength),
+ UTF8_ENCODER,
+ new IdleStateHandler(IDLE_READER_TIMEOUT, 0, 0),
+ new ReadTimeoutHandler(READ_TIMEOUT),
+ new ExceptionHandler(OvsdbConnectionService.this));
+ }
+ }
+
+ private class SslClientChannelInitializer extends ClientChannelInitializer {
+ private final ICertificateManager certManagerSrv;
+ private final InetAddress address;
+ private final int port;
+
+ SslClientChannelInitializer(final ICertificateManager certManagerSrv, final InetAddress address,
+ final int port) {
+ this.certManagerSrv = requireNonNull(certManagerSrv);
+ this.address = requireNonNull(address);
+ this.port = port;
+ }
+
+ @Override
+ public void initChannel(final SocketChannel channel) throws Exception {
+ SSLContext sslContext = certManagerSrv.getServerContext();
+ if (sslContext != null) {
+ /* First add ssl handler if ssl context is given */
+ SSLEngine engine = sslContext.createSSLEngine(address.toString(), port);
+ engine.setUseClientMode(true);
+ channel.pipeline().addLast("ssl", new SslHandler(engine));
+ }
+
+ super.initChannel(channel);
+ }
+ }
+
+ private class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
+ @Override
+ public final void initChannel(final SocketChannel channel) {
+ LOG.debug("New Passive channel created : {}", channel);
+ initChannelImpl(channel);
+ }
+
+ void initChannelImpl(final SocketChannel channel) {
+ channel.pipeline().addLast(
+ new JsonRpcDecoder(jsonRpcDecoderMaxFrameLength),
+ UTF8_ENCODER,
+ new IdleStateHandler(IDLE_READER_TIMEOUT, 0, 0),
+ new ReadTimeoutHandler(READ_TIMEOUT),
+ new ExceptionHandler(OvsdbConnectionService.this));
+ handleNewPassiveConnection(channel);
+ }
+ }
+
+ private final class SslServerChannelInitializer extends ServerChannelInitializer {
+ private final ICertificateManager certManagerSrv;
+ private final String[] protocols;
+ private final String[] cipherSuites;
+
+ SslServerChannelInitializer(final ICertificateManager certManagerSrv, final String[] protocols,
+ final String[] cipherSuites) {
+ this.certManagerSrv = requireNonNull(certManagerSrv);
+ this.protocols = requireNonNull(protocols);
+ this.cipherSuites = requireNonNull(cipherSuites);
+
+ }
+
+ SslServerChannelInitializer(final ICertificateManager certManagerSrv) {
+ this(certManagerSrv, certManagerSrv.getTlsProtocols(), certManagerSrv.getCipherSuites());
+ }
+
+ @Override
+ void initChannelImpl(final SocketChannel channel) {
+ /* Add SSL handler first if SSL context is provided */
+ final SSLContext sslContext = certManagerSrv.getServerContext();
+ if (sslContext != null) {
+ SSLEngine engine = sslContext.createSSLEngine();
+ engine.setUseClientMode(false); // work in a server mode
+ engine.setNeedClientAuth(true); // need client authentication
+ if (protocols != null && protocols.length > 0) {
+ //Set supported protocols
+ engine.setEnabledProtocols(protocols);
+ LOG.debug("Supported ssl protocols {}",
+ Arrays.toString(engine.getSupportedProtocols()));
+ LOG.debug("Enabled ssl protocols {}",
+ Arrays.toString(engine.getEnabledProtocols()));
+ }
+ if (cipherSuites != null && cipherSuites.length > 0) {
+ //Set supported cipher suites
+ engine.setEnabledCipherSuites(cipherSuites);
+ LOG.debug("Enabled cipher suites {}",
+ Arrays.toString(engine.getEnabledCipherSuites()));
+ }
+ channel.pipeline().addLast("ssl", new SslHandler(engine));
+ }
+ super.initChannelImpl(channel);
+ }
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(OvsdbConnectionService.class);
private static final int IDLE_READER_TIMEOUT = 30;
private static final int READ_TIMEOUT = 180;
public OvsdbClient connectWithSsl(final InetAddress address, final int port,
final ICertificateManager certificateManagerSrv) {
- Bootstrap bootstrap = bootstrapFactory.newClient()
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(final SocketChannel channel) throws Exception {
- if (certificateManagerSrv != null && certificateManagerSrv.getServerContext() != null) {
- SSLContext sslContext = certificateManagerSrv.getServerContext();
- /* First add ssl handler if ssl context is given */
- SSLEngine engine =
- sslContext.createSSLEngine(address.toString(), port);
- engine.setUseClientMode(true);
- channel.pipeline().addLast("ssl", new SslHandler(engine));
- }
- channel.pipeline().addLast(
- //new LoggingHandler(LogLevel.INFO),
- new JsonRpcDecoder(jsonRpcDecoderMaxFrameLength),
- UTF8_ENCODER,
- new IdleStateHandler(IDLE_READER_TIMEOUT, 0, 0),
- new ReadTimeoutHandler(READ_TIMEOUT),
- new ExceptionHandler(OvsdbConnectionService.this));
- }
- });
+ final ChannelFuture future = bootstrapFactory.newClient()
+ .handler(certificateManagerSrv == null ? new ClientChannelInitializer() :
+ new SslClientChannelInitializer(certificateManagerSrv, address, port))
+ .connect(address, port);
try {
- ChannelFuture future = bootstrap.connect(address, port).sync();
- Channel channel = future.channel();
- return getChannelClient(channel, ConnectionType.ACTIVE, SocketConnectionType.SSL);
+ future.sync();
} catch (InterruptedException e) {
LOG.warn("Failed to connect {}:{}", address, port, e);
+ return null;
} catch (Throwable throwable) {
// sync() re-throws exceptions declared as Throwable, so the compiler doesn't see them
LOG.error("Error while binding to address {}, port {}", address, port, throwable);
throw throwable;
}
- return null;
+
+ return getChannelClient(future.channel(), ConnectionType.ACTIVE, SocketConnectionType.SSL);
}
@Override
final ICertificateManager certificateManagerSrv,
final String[] protocols, final String[] cipherSuites) {
if (!singletonCreated.getAndSet(true)) {
- ovsdbManagerWithSsl(ovsdbListenIp, ovsdbListenPort, certificateManagerSrv, protocols, cipherSuites);
+ ovsdbManagerWithSsl(ovsdbListenIp, ovsdbListenPort,
+ certificateManagerSrv == null ? new ServerChannelInitializer()
+ : new SslServerChannelInitializer(certificateManagerSrv, protocols, cipherSuites));
return true;
} else {
return false;
LOG.error("Certificate Manager service is not available cannot establish the SSL communication.");
return;
}
- ovsdbManagerWithSsl(ip, port, certManagerSrv, certManagerSrv.getTlsProtocols(),
- certManagerSrv.getCipherSuites());
+ ovsdbManagerWithSsl(ip, port, new SslServerChannelInitializer(certManagerSrv));
} else {
- ovsdbManagerWithSsl(ip, port, null /* SslContext */, null, null);
+ ovsdbManagerWithSsl(ip, port, new ServerChannelInitializer());
}
}
* passive connection with Ssl and handle channel callbacks.
*/
@SuppressWarnings("checkstyle:IllegalCatch")
- private void ovsdbManagerWithSsl(final String ip, final int port, final ICertificateManager certificateManagerSrv,
- final String[] protocols, final String[] cipherSuites) {
-
+ private void ovsdbManagerWithSsl(final String ip, final int port, final ServerChannelInitializer channelHandler) {
bootstrapFactory.newServer()
.handler(new LoggingHandler(LogLevel.INFO))
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(final SocketChannel channel) throws Exception {
- LOG.debug("New Passive channel created : {}", channel);
- if (certificateManagerSrv != null) {
- /* Add SSL handler first if SSL context is provided */
- final SSLContext sslContext = certificateManagerSrv.getServerContext();
- if (sslContext != null) {
- SSLEngine engine = sslContext.createSSLEngine();
- engine.setUseClientMode(false); // work in a server mode
- engine.setNeedClientAuth(true); // need client authentication
- if (protocols != null && protocols.length > 0) {
- //Set supported protocols
- engine.setEnabledProtocols(protocols);
- LOG.debug("Supported ssl protocols {}",
- Arrays.toString(engine.getSupportedProtocols()));
- LOG.debug("Enabled ssl protocols {}",
- Arrays.toString(engine.getEnabledProtocols()));
- }
- if (cipherSuites != null && cipherSuites.length > 0) {
- //Set supported cipher suites
- engine.setEnabledCipherSuites(cipherSuites);
- LOG.debug("Enabled cipher suites {}",
- Arrays.toString(engine.getEnabledCipherSuites()));
- }
- channel.pipeline().addLast("ssl", new SslHandler(engine));
- }
- }
-
- channel.pipeline().addLast(
- new JsonRpcDecoder(jsonRpcDecoderMaxFrameLength),
- UTF8_ENCODER,
- new IdleStateHandler(IDLE_READER_TIMEOUT, 0, 0),
- new ReadTimeoutHandler(READ_TIMEOUT),
- new ExceptionHandler(OvsdbConnectionService.this));
-
- handleNewPassiveConnection(channel);
- }
- })
+ .childHandler(channelHandler)
// Start the server.
.bind(ip, port)
// Propagate the channel when its ready