Bug 6185 - southbound system tests failing when
[ovsdb.git] / library / impl / src / main / java / org / opendaylight / ovsdb / lib / impl / OvsdbConnectionService.java
index 674e7af680fb4f6728f1a57f269654300180cf4a..7bda83eb8e36aae627c7fb446a6463d0d492f63b 100644 (file)
@@ -13,6 +13,9 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.ServerBootstrap;
@@ -40,10 +43,12 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
@@ -80,9 +85,15 @@ import org.slf4j.LoggerFactory;
 public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
     private static final Logger LOG = LoggerFactory.getLogger(OvsdbConnectionService.class);
 
-    private static ThreadFactory threadFactory = new ThreadFactoryBuilder()
-            .setNameFormat("OVSDB-PassiveConnectionService-%d").build();
-    private static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10, threadFactory);
+    private static ThreadFactory passiveConnectionThreadFactory = new ThreadFactoryBuilder()
+            .setNameFormat("OVSDBPassiveConnServ-%d").build();
+    private static ScheduledExecutorService executorService
+            = Executors.newScheduledThreadPool(10, passiveConnectionThreadFactory);
+
+    private static ThreadFactory connectionNotifierThreadFactory = new ThreadFactoryBuilder()
+            .setNameFormat("OVSDBConnNotifSer-%d").build();
+    private static ExecutorService connectionNotifierService
+            = Executors.newCachedThreadPool(connectionNotifierThreadFactory);
 
     private static Set<OvsdbConnectionListener> connectionListeners = Sets.newHashSet();
     private static Map<OvsdbClient, Channel> connections = Maps.newHashMap();
@@ -309,15 +320,33 @@ public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
     }
 
     private static void handleNewPassiveConnection(OvsdbClient client) {
-        List<OvsdbClient> clientsFromSameNode = getPassiveClientsFromSameNode(client);
-        if (clientsFromSameNode.size() == 0) {
-            notifyListenerForPassiveConnection(client);
-        } else {
-            stalePassiveConnectionService.handleNewPassiveConnection(client, clientsFromSameNode);
-        }
+        ListenableFuture<List<String>> echoFuture = client.echo();
+        LOG.debug("Send echo message to probe the OVSDB switch {}",client.getConnectionInfo());
+        Futures.addCallback(echoFuture, new FutureCallback<List<String>>() {
+            @Override
+            public void onSuccess(@Nullable List<String> result) {
+                LOG.debug("Probe was successful to OVSDB switch {}",client.getConnectionInfo());
+                List<OvsdbClient> clientsFromSameNode = getPassiveClientsFromSameNode(client);
+                if (clientsFromSameNode.size() == 0) {
+                    notifyListenerForPassiveConnection(client);
+                } else {
+                    stalePassiveConnectionService.handleNewPassiveConnection(client, clientsFromSameNode);
+                }
+            }
+
+            @Override
+            public void onFailure(Throwable failureException) {
+                LOG.error("Probe failed to OVSDB switch. Disconnecting the channel {}", client.getConnectionInfo());
+                client.disconnect();
+            }
+        }, connectionNotifierService);
     }
 
     private static void handleNewPassiveConnection(final Channel channel) {
+        if (!channel.isOpen()) {
+            LOG.warn("Channel {} is not open, skipped further processing of the connection.",channel);
+            return;
+        }
         SslHandler sslHandler = (SslHandler) channel.pipeline().get("ssl");
         if (sslHandler != null) {
             class HandleNewPassiveSslRunner implements Runnable {
@@ -414,10 +443,12 @@ public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
     public static void channelClosed(final OvsdbClient client) {
         LOG.info("Connection closed {}", client.getConnectionInfo().toString());
         connections.remove(client);
-        for (OvsdbConnectionListener listener : connectionListeners) {
-            listener.disconnected(client);
-            stalePassiveConnectionService.clientDisconnected(client);
+        if (client.isConnectionPublished()) {
+            for (OvsdbConnectionListener listener : connectionListeners) {
+                listener.disconnected(client);
+            }
         }
+        stalePassiveConnectionService.clientDisconnected(client);
     }
 
     @Override
@@ -454,9 +485,16 @@ public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
         return passiveClients;
     }
 
-    public static void notifyListenerForPassiveConnection(OvsdbClient client) {
-        for (OvsdbConnectionListener listener : connectionListeners) {
-            listener.connected(client);
+    public static void notifyListenerForPassiveConnection(final OvsdbClient client) {
+        client.setConnectionPublished(true);
+        for (final OvsdbConnectionListener listener : connectionListeners) {
+            connectionNotifierService.submit(new Runnable() {
+                @Override
+                public void run() {
+                    LOG.trace("Connection {} notified to listener {}", client.getConnectionInfo(), listener);
+                    listener.connected(client);
+                }
+            });
         }
     }
 }