package org.opendaylight.ovsdb.lib.impl;
-import com.fasterxml.jackson.annotation.JsonInclude.Include;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
-import io.netty.util.CharsetUtil;
import java.net.InetAddress;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import javax.annotation.Nullable;
import javax.inject.Inject;
+import javax.inject.Singleton;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLPeerUnverifiedException;
+import org.apache.aries.blueprint.annotation.service.Reference;
+import org.apache.aries.blueprint.annotation.service.Service;
+import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.aaa.cert.api.ICertificateManager;
import org.opendaylight.ovsdb.lib.OvsdbClient;
import org.opendaylight.ovsdb.lib.OvsdbConnection;
import org.opendaylight.ovsdb.lib.jsonrpc.ExceptionHandler;
import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcDecoder;
import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcEndpoint;
-import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcServiceBinderHandler;
-import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* environment. Hence a single instance of the service will be active (via Service Registry in OSGi)
* and a Singleton object in a non-OSGi environment.
*/
+@Singleton
+@Service(classes = OvsdbConnection.class)
public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
private static final Logger LOG = LoggerFactory.getLogger(OvsdbConnectionService.class);
private static final int IDLE_READER_TIMEOUT = 30;
private static final String USE_SSL = "use-ssl";
private static final int RETRY_PERIOD = 100; // retry after 100 milliseconds
+ private static final StringEncoder UTF8_ENCODER = new StringEncoder(StandardCharsets.UTF_8);
+
private static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newScheduledThreadPool(10,
new ThreadFactoryBuilder().setNameFormat("OVSDBPassiveConnServ-%d").build());
private volatile int listenerPort = 6640;
@Inject
- public OvsdbConnectionService(ICertificateManager certManagerSrv) {
+ public OvsdbConnectionService(@Reference(filter = "type=default-certificate-manager")
+ final ICertificateManager certManagerSrv) {
this.certManagerSrv = certManagerSrv;
}
}
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
public OvsdbClient connectWithSsl(final InetAddress address, final int port,
- final ICertificateManager certificateManagerSrv) {
+ final ICertificateManager certificateManagerSrv) {
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(new NioEventLoopGroup());
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
- public void initChannel(SocketChannel channel) throws Exception {
+ public void initChannel(final SocketChannel channel) throws Exception {
if (certificateManagerSrv != null && certificateManagerSrv.getServerContext() != null) {
SSLContext sslContext = certificateManagerSrv.getServerContext();
/* First add ssl handler if ssl context is given */
channel.pipeline().addLast(
//new LoggingHandler(LogLevel.INFO),
new JsonRpcDecoder(jsonRpcDecoderMaxFrameLength),
- new StringEncoder(CharsetUtil.UTF_8),
+ UTF8_ENCODER,
new IdleStateHandler(IDLE_READER_TIMEOUT, 0, 0),
new ReadTimeoutHandler(READ_TIMEOUT),
new ExceptionHandler(OvsdbConnectionService.this));
return getChannelClient(channel, ConnectionType.ACTIVE, SocketConnectionType.SSL);
} catch (InterruptedException e) {
LOG.warn("Failed to connect {}:{}", address, port, e);
+ } catch (Throwable throwable) {
+ // sync() re-throws exceptions declared as Throwable, so the compiler doesn't see them
+ LOG.error("Error while binding to address {}, port {}", address, port, throwable);
+ throw throwable;
}
return null;
}
@Override
- public void disconnect(OvsdbClient client) {
+ public void disconnect(final OvsdbClient client) {
if (client == null) {
return;
}
}
@Override
- public void registerConnectionListener(OvsdbConnectionListener listener) {
+ public void registerConnectionListener(final OvsdbConnectionListener listener) {
LOG.info("registerConnectionListener: registering {}", listener.getClass().getSimpleName());
CONNECTION_LISTENERS.add(listener);
notifyAlreadyExistingConnectionsToListener(listener);
}
@Override
- public void unregisterConnectionListener(OvsdbConnectionListener listener) {
+ public void unregisterConnectionListener(final OvsdbConnectionListener listener) {
CONNECTION_LISTENERS.remove(listener);
}
- private static OvsdbClient getChannelClient(Channel channel, ConnectionType type,
- SocketConnectionType socketConnType) {
- ObjectMapper objectMapper = new ObjectMapper();
- objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- objectMapper.setSerializationInclusion(Include.NON_NULL);
+ private static OvsdbClient getChannelClient(final Channel channel, final ConnectionType type,
+ final SocketConnectionType socketConnType) {
- JsonRpcEndpoint factory = new JsonRpcEndpoint(objectMapper, channel);
- JsonRpcServiceBinderHandler binderHandler = new JsonRpcServiceBinderHandler(factory);
- binderHandler.setContext(channel);
- channel.pipeline().addLast(binderHandler);
+ JsonRpcEndpoint endpoint = new JsonRpcEndpoint(channel);
+ channel.pipeline().addLast(endpoint);
- OvsdbRPC rpc = factory.getClient(channel, OvsdbRPC.class);
- OvsdbClientImpl client = new OvsdbClientImpl(rpc, channel, type, socketConnType);
+ OvsdbClientImpl client = new OvsdbClientImpl(endpoint, channel, type, socketConnType);
client.setConnectionPublished(true);
CONNECTIONS.put(client, channel);
- ChannelFuture closeFuture = channel.closeFuture();
- closeFuture.addListener(new ChannelConnectionHandler(client));
+ channel.closeFuture().addListener(new ChannelConnectionHandler(client));
return client;
}
@Override
public synchronized boolean startOvsdbManagerWithSsl(final String ovsdbListenIp, final int ovsdbListenPort,
final ICertificateManager certificateManagerSrv,
- String[] protocols, String[] cipherSuites) {
+ final String[] protocols, final String[] cipherSuites) {
if (!singletonCreated.getAndSet(true)) {
new Thread(() -> ovsdbManagerWithSsl(ovsdbListenIp, ovsdbListenPort,
certificateManagerSrv, protocols, cipherSuites)).start();
* If the SSL flag is enabled, the method internally will establish TLS communication using the default
* ODL certificateManager SSLContext and attributes.
*/
- private void ovsdbManager(String ip, int port) {
+ private void ovsdbManager(final String ip, final int port) {
if (useSSL) {
if (certManagerSrv == null) {
LOG.error("Certificate Manager service is not available cannot establish the SSL communication.");
* OVSDB Passive listening thread that uses Netty ServerBootstrap to open
* passive connection with Ssl and handle channel callbacks.
*/
- private void ovsdbManagerWithSsl(String ip, int port, final ICertificateManager certificateManagerSrv,
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void ovsdbManagerWithSsl(final String ip, final int port, final ICertificateManager certificateManagerSrv,
final String[] protocols, final String[] cipherSuites) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
- public void initChannel(SocketChannel channel) throws Exception {
+ public void initChannel(final SocketChannel channel) throws Exception {
LOG.debug("New Passive channel created : {}", channel);
if (certificateManagerSrv != null && certificateManagerSrv.getServerContext() != null) {
/* Add SSL handler first if SSL context is provided */
channel.pipeline().addLast(
new JsonRpcDecoder(jsonRpcDecoderMaxFrameLength),
- new StringEncoder(CharsetUtil.UTF_8),
+ UTF8_ENCODER,
new IdleStateHandler(IDLE_READER_TIMEOUT, 0, 0),
new ReadTimeoutHandler(READ_TIMEOUT),
new ExceptionHandler(OvsdbConnectionService.this));
serverListenChannel.closeFuture().sync();
} catch (InterruptedException e) {
LOG.error("Thread interrupted", e);
+ } catch (Throwable throwable) {
+ // sync() re-throws exceptions declared as Throwable, so the compiler doesn't see them
+ LOG.error("Error while binding to address {}, port {}", ip, port, throwable);
+ throw throwable;
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
}
}
- private static void handleNewPassiveConnection(OvsdbClient client) {
+ private static void handleNewPassiveConnection(final OvsdbClient client) {
ListenableFuture<List<String>> echoFuture = client.echo();
LOG.debug("Send echo message to probe the OVSDB switch {}",client.getConnectionInfo());
Futures.addCallback(echoFuture, new FutureCallback<List<String>>() {
@Override
- public void onSuccess(@Nullable List<String> result) {
+ public void onSuccess(@Nullable final List<String> result) {
LOG.debug("Probe was successful to OVSDB switch {}",client.getConnectionInfo());
List<OvsdbClient> clientsFromSameNode = getPassiveClientsFromSameNode(client);
if (clientsFromSameNode.size() == 0) {
}
@Override
- public void onFailure(Throwable failureException) {
+ public void onFailure(final Throwable failureException) {
LOG.error("Probe failed to OVSDB switch. Disconnecting the channel {}", client.getConnectionInfo());
client.disconnect();
}
}
public static void channelClosed(final OvsdbClient client) {
- LOG.info("Connection closed {}", client.getConnectionInfo().toString());
+ LOG.info("Connection closed {}", client.getConnectionInfo());
CONNECTIONS.remove(client);
if (client.isConnectionPublished()) {
for (OvsdbConnectionListener listener : CONNECTION_LISTENERS) {
}
@Override
- public OvsdbClient getClient(Channel channel) {
+ public OvsdbClient getClient(final Channel channel) {
for (Entry<OvsdbClient, Channel> entry : CONNECTIONS.entrySet()) {
OvsdbClient client = entry.getKey();
Channel ctx = entry.getValue();
return null;
}
- private static List<OvsdbClient> getPassiveClientsFromSameNode(OvsdbClient ovsdbClient) {
+ private static List<OvsdbClient> getPassiveClientsFromSameNode(final OvsdbClient ovsdbClient) {
List<OvsdbClient> passiveClients = new ArrayList<>();
for (OvsdbClient client : CONNECTIONS.keySet()) {
if (!client.equals(ovsdbClient)
}
}
- public void setOvsdbRpcTaskTimeout(int timeout) {
+ public void setOvsdbRpcTaskTimeout(final int timeout) {
JsonRpcEndpoint.setReaperInterval(timeout);
}
*
* @param flag boolean for using ssl
*/
- public void setUseSsl(boolean flag) {
+ public void setUseSsl(final boolean flag) {
useSSL = flag;
}
* change at the run time will have no impact.
* @param maxFrameLength Max frame length (default : 100000)
*/
- public void setJsonRpcDecoderMaxFrameLength(int maxFrameLength) {
+ public void setJsonRpcDecoderMaxFrameLength(final int maxFrameLength) {
jsonRpcDecoderMaxFrameLength = maxFrameLength;
LOG.info("Json Rpc Decoder Max Frame Length set to : {}", jsonRpcDecoderMaxFrameLength);
}
- public void setOvsdbListenerIp(String ip) {
+ public void setOvsdbListenerIp(final String ip) {
LOG.info("OVSDB IP for listening connection is set to : {}", ip);
listenerIp = ip;
}
- public void setOvsdbListenerPort(int portNumber) {
+ public void setOvsdbListenerPort(final int portNumber) {
LOG.info("OVSDB port for listening connection is set to : {}", portNumber);
listenerPort = portNumber;
}
- public void updateConfigParameter(Map<String, Object> configParameters) {
+ public void updateConfigParameter(final Map<String, Object> configParameters) {
if (configParameters != null && !configParameters.isEmpty()) {
LOG.debug("Config parameters received : {}", configParameters.entrySet());
for (Map.Entry<String, Object> paramEntry : configParameters.entrySet()) {