Update MRI projects for Aluminium
[ovsdb.git] / library / impl / src / main / java / org / opendaylight / ovsdb / lib / impl / StalePassiveConnectionService.java
index a74fa1be97544849bbb697da8bb1e9dc7df6894a..b04ca6ecf2a1afb898aa5adfe562485e6a29f236 100644 (file)
@@ -9,15 +9,15 @@ package org.opendaylight.ovsdb.lib.impl;
 
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 import org.opendaylight.ovsdb.lib.OvsdbClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,16 +34,18 @@ import org.slf4j.LoggerFactory;
  * @author Vinh Nguyen (vinh.nguyen@hcl.com) on 6/10/16.
  */
 public class StalePassiveConnectionService implements AutoCloseable {
+
     private static final Logger LOG = LoggerFactory.getLogger(StalePassiveConnectionService.class);
 
-    private static Map<OvsdbClient, Map<OvsdbClient, SettableFuture<List<String>>>> pendingConnectionClients =
-            new ConcurrentHashMap<>();
+    private final Map<OvsdbClient, Set<OvsdbClient>> pendingClients = new ConcurrentHashMap<>();
+    private final Function<OvsdbClient, Void> clientNotificationCallback;
 
-    private final ScheduledExecutorService executorService;
-    private static final int ECHO_TIMEOUT = 10;
+    public StalePassiveConnectionService(Function<OvsdbClient, Void> clientNotificationCallback) {
+        this.clientNotificationCallback = clientNotificationCallback;
+    }
 
-    public StalePassiveConnectionService(final ScheduledExecutorService executorService) {
-        this.executorService = executorService;
+    public Map<OvsdbClient, Set<OvsdbClient>> getPendingClients() {
+        return new HashMap<>(pendingClients);
     }
 
     /**
@@ -55,38 +57,34 @@ public class StalePassiveConnectionService implements AutoCloseable {
      */
     public void handleNewPassiveConnection(final OvsdbClient newOvsdbClient,
                                            final List<OvsdbClient> clientsFromSameNode) {
-        final Map<OvsdbClient, SettableFuture<List<String>>> clientFutureMap = new ConcurrentHashMap<>();
-        pendingConnectionClients.put(newOvsdbClient, clientFutureMap);
-
-        // scheduled task for ping response timeout. Connections that don't response to the
-        // ping or haven't disconnected after the timeout will be closed
-        final ScheduledFuture<?> echoTimeoutFuture =
-                executorService.schedule(() -> {
-                    for (Entry<OvsdbClient, SettableFuture<List<String>>> entry : clientFutureMap.entrySet()) {
-                        OvsdbClient client = entry.getKey();
-                        Future<?> clientFuture = entry.getValue();
-                        if (!clientFuture.isDone() && !clientFuture.isCancelled()) {
-                            clientFuture.cancel(true);
-                        }
-                        if (client.isActive()) {
-                            client.disconnect();
-                        }
+        LOG.info("Adding client to pending list {}", newOvsdbClient.getConnectionInfo());
+        pendingClients.put(newOvsdbClient, new HashSet<>());
+        /*
+            if old client echo succeeds
+               do not notify new client as it has to wait
+            else
+                if all old clients got disconnected/echo failed notify the new client
+         */
+        for (final OvsdbClient oldClient : clientsFromSameNode) {
+            pendingClients.get(newOvsdbClient).add(oldClient);
+            LOG.info("Echo testing client {}", oldClient.getConnectionInfo());
+            Futures.addCallback(oldClient.echo(),
+                new FutureCallback<List<String>>() {
+                    @Override
+                    public void onSuccess(List<String> result) {
+                        //old client still active
+                        LOG.info("Echo testing of old client {} succeeded", oldClient.getConnectionInfo());
                     }
-                }, ECHO_TIMEOUT, TimeUnit.SECONDS);
 
-        // for every connection create a SettableFuture, save it to 'clientFutureMap', and send a ping (echo).
-        // The ping results in either:
-        // 1. ping response returns - the connection is active
-        // 2. the netty connection is closed due to IO exception -
-        // The future is removed from the 'clientFutureMap' when the onSuccess event for each future arrives
-        // If the map is empty we proceed with new connection process
-        for (final OvsdbClient client : clientsFromSameNode) {
-            SettableFuture<List<String>> clientFuture = SettableFuture.create();
-            clientFutureMap.put(client, clientFuture);
-            Futures.addCallback(clientFuture,
-                    createStaleConnectionFutureCallback(client, newOvsdbClient, clientFutureMap, echoTimeoutFuture));
-            Futures.addCallback(client.echo(),
-                    createStaleConnectionFutureCallback(client, newOvsdbClient, clientFutureMap, echoTimeoutFuture));
+                    @Override
+                    public void onFailure(Throwable throwable) {
+                        LOG.info("Echo testing of old client {} failed, disconnect and notify clients",
+                                oldClient.getConnectionInfo());
+                        //disconnect the old client to cleanup, so that new connection can proceed
+                        oldClient.disconnect();
+                        onInactiveClient(oldClient);
+                    }
+                }, MoreExecutors.directExecutor());
         }
     }
 
@@ -94,50 +92,26 @@ public class StalePassiveConnectionService implements AutoCloseable {
      * Notify the service that the given client has disconnected.
      * @param disconnectedClient the client just disconnected
      */
-    public void clientDisconnected(OvsdbClient disconnectedClient) {
-        for (Entry<OvsdbClient, Map<OvsdbClient, SettableFuture<List<String>>>> entry :
-                pendingConnectionClients.entrySet()) {
-            OvsdbClient pendingClient = entry.getKey();
+    public synchronized void clientDisconnected(OvsdbClient disconnectedClient) {
+        LOG.info("Client disconnected {}", disconnectedClient.getConnectionInfo());
+        onInactiveClient(disconnectedClient);
+    }
 
-            // set the future result for pending connections that wait for this client to be disconnected
-            if (pendingClient.getConnectionInfo().getRemoteAddress()
-                    .equals(disconnectedClient.getConnectionInfo().getRemoteAddress())) {
-                Map<OvsdbClient, SettableFuture<List<String>>> clientFutureMap = entry.getValue();
-                SettableFuture<List<String>> future = clientFutureMap.get(disconnectedClient);
-                if (future != null) {
-                    future.set(null);
-                }
-            }
+    public synchronized void onInactiveClient(OvsdbClient disconnectedClient) {
+        pendingClients.remove(disconnectedClient);
+        pendingClients.entrySet().stream().forEach(entry -> entry.getValue().remove(disconnectedClient));
+        Optional<OvsdbClient> clientOptional = pendingClients.entrySet().stream()
+                .filter(entry -> entry.getValue().isEmpty())
+                .map(entry -> entry.getKey())
+                .findFirst();
+        if (clientOptional.isPresent()) {
+            LOG.info("Sending notification for client {}", clientOptional.get().getConnectionInfo());
+            pendingClients.remove(clientOptional.get());
+            clientNotificationCallback.apply(clientOptional.get());
         }
     }
 
     @Override
     public void close() {
     }
-
-    private FutureCallback<List<String>> createStaleConnectionFutureCallback(
-            final OvsdbClient cbForClient, final OvsdbClient newClient,
-            final Map<OvsdbClient, SettableFuture<List<String>>> clientFutureMap,
-            final ScheduledFuture<?> echoTimeoutFuture) {
-        return new FutureCallback<List<String>>() {
-            @Override
-            public void onSuccess(List<String> result) {
-                // The future is removed from the 'clientFutureMap' when the onSuccess event for each future arrives
-                // If the map is empty we proceed with new connection process
-                clientFutureMap.remove(cbForClient);
-                if (clientFutureMap.isEmpty()) {
-                    if (!echoTimeoutFuture.isDone() && !echoTimeoutFuture.isCancelled()) {
-                        echoTimeoutFuture.cancel(true);
-                    }
-                    OvsdbConnectionService.notifyListenerForPassiveConnection(newClient);
-                    pendingConnectionClients.remove(newClient);
-                }
-            }
-
-            @Override
-            public void onFailure(Throwable throwable) {
-                LOG.error("Error in checking stale connections)", throwable);
-            }
-        };
-    }
 }