import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
private static final Logger LOG = LoggerFactory.getLogger(OvsdbConnectionService.class);
- private static ThreadFactory threadFactory = new ThreadFactoryBuilder()
- .setNameFormat("OVSDB-PassiveConnectionService-%d").build();
- private static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10, threadFactory);
+ private static ThreadFactory passiveConnectionThreadFactory = new ThreadFactoryBuilder()
+ .setNameFormat("OVSDBPassiveConnServ-%d").build();
+ private static ScheduledExecutorService executorService
+ = Executors.newScheduledThreadPool(10, passiveConnectionThreadFactory);
+
+ private static ThreadFactory connectionNotifierThreadFactory = new ThreadFactoryBuilder()
+ .setNameFormat("OVSDBConnNotifSer-%d").build();
+ private static ExecutorService connectionNotifierService
+ = Executors.newCachedThreadPool(connectionNotifierThreadFactory);
private static Set<OvsdbConnectionListener> connectionListeners = Sets.newHashSet();
private static Map<OvsdbClient, Channel> connections = Maps.newHashMap();
}
private static void handleNewPassiveConnection(OvsdbClient client) {
- List<OvsdbClient> clientsFromSameNode = getPassiveClientsFromSameNode(client);
- if (clientsFromSameNode.size() == 0) {
- notifyListenerForPassiveConnection(client);
- } else {
- stalePassiveConnectionService.handleNewPassiveConnection(client, clientsFromSameNode);
- }
+ ListenableFuture<List<String>> echoFuture = client.echo();
+ LOG.debug("Send echo message to probe the OVSDB switch {}",client.getConnectionInfo());
+ Futures.addCallback(echoFuture, new FutureCallback<List<String>>() {
+ @Override
+ public void onSuccess(@Nullable List<String> result) {
+ LOG.debug("Probe was successful to OVSDB switch {}",client.getConnectionInfo());
+ List<OvsdbClient> clientsFromSameNode = getPassiveClientsFromSameNode(client);
+ if (clientsFromSameNode.size() == 0) {
+ notifyListenerForPassiveConnection(client);
+ } else {
+ stalePassiveConnectionService.handleNewPassiveConnection(client, clientsFromSameNode);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable failureException) {
+ LOG.error("Probe failed to OVSDB switch. Disconnecting the channel {}", client.getConnectionInfo());
+ client.disconnect();
+ }
+ }, connectionNotifierService);
}
private static void handleNewPassiveConnection(final Channel channel) {
+ if (!channel.isOpen()) {
+ LOG.warn("Channel {} is not open, skipped further processing of the connection.",channel);
+ return;
+ }
SslHandler sslHandler = (SslHandler) channel.pipeline().get("ssl");
if (sslHandler != null) {
class HandleNewPassiveSslRunner implements Runnable {
public static void channelClosed(final OvsdbClient client) {
LOG.info("Connection closed {}", client.getConnectionInfo().toString());
connections.remove(client);
- for (OvsdbConnectionListener listener : connectionListeners) {
- listener.disconnected(client);
- stalePassiveConnectionService.clientDisconnected(client);
+ if (client.isConnectionPublished()) {
+ for (OvsdbConnectionListener listener : connectionListeners) {
+ listener.disconnected(client);
+ }
}
+ stalePassiveConnectionService.clientDisconnected(client);
}
@Override
return passiveClients;
}
- public static void notifyListenerForPassiveConnection(OvsdbClient client) {
- for (OvsdbConnectionListener listener : connectionListeners) {
- listener.connected(client);
+ public static void notifyListenerForPassiveConnection(final OvsdbClient client) {
+ client.setConnectionPublished(true);
+ for (final OvsdbConnectionListener listener : connectionListeners) {
+ connectionNotifierService.submit(new Runnable() {
+ @Override
+ public void run() {
+ LOG.trace("Connection {} notified to listener {}", client.getConnectionInfo(), listener);
+ listener.connected(client);
+ }
+ });
}
}
}