OVSDB-439 Stale connection check 03/71303/12
authorK.V Suneelu Verma <k.v.suneelu.verma@ericsson.com>
Wed, 25 Apr 2018 10:07:39 +0000 (15:37 +0530)
committerVishal Thapar <vthapar@redhat.com>
Fri, 6 Jul 2018 02:05:43 +0000 (02:05 +0000)
if old client echo succeeds
   do not notify new client as it has to wait
else
   if all old clients got disconnected notify the new client

if new client gets disconnected remove it from pending clients

JIRA: OVSDB-439
JIRA: OVSDB-462
Change-Id: Id3cd37559825d7f0ef58331bfbd4757eae41d253
Signed-off-by: K.V Suneelu Verma <k.v.suneelu.verma@ericsson.com>
library/impl/pom.xml
library/impl/src/main/java/org/opendaylight/ovsdb/lib/impl/OvsdbConnectionService.java
library/impl/src/main/java/org/opendaylight/ovsdb/lib/impl/StalePassiveConnectionService.java
library/impl/src/test/java/org/opendaylight/ovsdb/lib/StalePassiveConnectionServiceTest.java [new file with mode: 0644]

index 2c489ac7c530b143c10423db03326c28523519fa..d7cd87de3c695c6a7e00a80a5eba4888afeb5cf9 100644 (file)
@@ -37,7 +37,18 @@ and is available at http://www.eclipse.org/legal/epl-v10.html
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-core</artifactId>
     </dependency>
-    <dependency>
+      <dependency>
+          <groupId>org.powermock</groupId>
+          <artifactId>powermock-api-mockito</artifactId>
+          <scope>test</scope>
+      </dependency>
+      <dependency>
+          <groupId>org.powermock</groupId>
+          <artifactId>powermock-module-junit4</artifactId>
+          <scope>test</scope>
+      </dependency>
+
+      <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
     </dependency>
index cca48636a2e3af151f824802b86f653e3cb8e7b1..7e12a4ee7fc21c7a6ff6bb9ce0da2770b44cbd58 100644 (file)
@@ -98,7 +98,10 @@ public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
             .newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("OVSDBConnNotifSer-%d").build());
 
     private static final StalePassiveConnectionService STALE_PASSIVE_CONNECTION_SERVICE =
-            new StalePassiveConnectionService(EXECUTOR_SERVICE);
+            new StalePassiveConnectionService((client) -> {
+                notifyListenerForPassiveConnection(client);
+                return null;
+            });
 
     private static final Set<OvsdbConnectionListener> CONNECTION_LISTENERS = ConcurrentHashMap.newKeySet();
     private static final Map<OvsdbClient, Channel> CONNECTIONS = new ConcurrentHashMap<>();
index 66f0f3e148f105b4f61b14286aed1f5e0080adc9..fa1edfb5b4469538fe32a21d7291a55f4f155191 100644 (file)
@@ -9,16 +9,16 @@ package org.opendaylight.ovsdb.lib.impl;
 
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.SettableFuture;
+
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.CancellationException;
+import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
 import org.opendaylight.ovsdb.lib.OvsdbClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,16 +35,18 @@ import org.slf4j.LoggerFactory;
  * @author Vinh Nguyen (vinh.nguyen@hcl.com) on 6/10/16.
  */
 public class StalePassiveConnectionService implements AutoCloseable {
+
     private static final Logger LOG = LoggerFactory.getLogger(StalePassiveConnectionService.class);
 
-    private static Map<OvsdbClient, Map<OvsdbClient, SettableFuture<List<String>>>> pendingConnectionClients =
-            new ConcurrentHashMap<>();
+    private final Map<OvsdbClient, Set<OvsdbClient>> pendingClients = new ConcurrentHashMap<>();
+    private final Function<OvsdbClient, Void> clientNotificationCallback;
 
-    private final ScheduledExecutorService executorService;
-    private static final int ECHO_TIMEOUT = 10;
+    public StalePassiveConnectionService(Function<OvsdbClient, Void> clientNotificationCallback) {
+        this.clientNotificationCallback = clientNotificationCallback;
+    }
 
-    public StalePassiveConnectionService(final ScheduledExecutorService executorService) {
-        this.executorService = executorService;
+    public Map<OvsdbClient, Set<OvsdbClient>> getPendingClients() {
+        return new HashMap(pendingClients);
     }
 
     /**
@@ -56,38 +58,31 @@ public class StalePassiveConnectionService implements AutoCloseable {
      */
     public void handleNewPassiveConnection(final OvsdbClient newOvsdbClient,
                                            final List<OvsdbClient> clientsFromSameNode) {
-        final Map<OvsdbClient, SettableFuture<List<String>>> clientFutureMap = new ConcurrentHashMap<>();
-        pendingConnectionClients.put(newOvsdbClient, clientFutureMap);
-
-        // scheduled task for ping response timeout. Connections that don't response to the
-        // ping or haven't disconnected after the timeout will be closed
-        final ScheduledFuture<?> echoTimeoutFuture =
-                executorService.schedule(() -> {
-                    for (Entry<OvsdbClient, SettableFuture<List<String>>> entry : clientFutureMap.entrySet()) {
-                        OvsdbClient client = entry.getKey();
-                        Future<?> clientFuture = entry.getValue();
-                        if (!clientFuture.isDone() && !clientFuture.isCancelled()) {
-                            clientFuture.cancel(true);
-                        }
-                        if (client.isActive()) {
-                            client.disconnect();
-                        }
+        LOG.info("Adding client to pending list {}", newOvsdbClient.getConnectionInfo());
+        pendingClients.put(newOvsdbClient, new HashSet<>());
+        /*
+            if old client echo succeeds
+               do not notify new client as it has to wait
+            else
+                if all old clients got disconnected/echo failed notify the new client
+         */
+        for (final OvsdbClient oldClient : clientsFromSameNode) {
+            pendingClients.get(newOvsdbClient).add(oldClient);
+            LOG.info("Echo testing client {}", oldClient.getConnectionInfo());
+            Futures.addCallback(oldClient.echo(),
+                new FutureCallback<List<String>>() {
+                    @Override
+                    public void onSuccess(List<String> result) {
+                        //old client still active
+                        LOG.info("Echo testing of old client {} succeded", oldClient.getConnectionInfo());
                     }
-                }, ECHO_TIMEOUT, TimeUnit.SECONDS);
 
-        // for every connection create a SettableFuture, save it to 'clientFutureMap', and send a ping (echo).
-        // The ping results in either:
-        // 1. ping response returns - the connection is active
-        // 2. the netty connection is closed due to IO exception -
-        // The future is removed from the 'clientFutureMap' when the onSuccess event for each future arrives
-        // If the map is empty we proceed with new connection process
-        for (final OvsdbClient client : clientsFromSameNode) {
-            SettableFuture<List<String>> clientFuture = SettableFuture.create();
-            clientFutureMap.put(client, clientFuture);
-            Futures.addCallback(clientFuture,
-                    createStaleConnectionFutureCallback(client, newOvsdbClient, clientFutureMap, echoTimeoutFuture));
-            Futures.addCallback(client.echo(),
-                    createStaleConnectionFutureCallback(client, newOvsdbClient, clientFutureMap, echoTimeoutFuture));
+                    @Override
+                    public void onFailure(Throwable throwable) {
+                        LOG.info("Echo testing of old client {} failed", oldClient.getConnectionInfo());
+                        onInactiveClient(oldClient);
+                    }
+                });
         }
     }
 
@@ -95,67 +90,26 @@ public class StalePassiveConnectionService implements AutoCloseable {
      * Notify the service that the given client has disconnected.
      * @param disconnectedClient the client just disconnected
      */
-    public void clientDisconnected(OvsdbClient disconnectedClient) {
-        for (Entry<OvsdbClient, Map<OvsdbClient, SettableFuture<List<String>>>> entry :
-                pendingConnectionClients.entrySet()) {
-            OvsdbClient pendingClient = entry.getKey();
+    public synchronized void clientDisconnected(OvsdbClient disconnectedClient) {
+        LOG.info("Client disconnected {}", disconnectedClient.getConnectionInfo());
+        onInactiveClient(disconnectedClient);
+    }
 
-            // set the future result for pending connections that wait for this client to be disconnected
-            if (pendingClient.getConnectionInfo().getRemoteAddress()
-                    .equals(disconnectedClient.getConnectionInfo().getRemoteAddress())) {
-                Map<OvsdbClient, SettableFuture<List<String>>> clientFutureMap = entry.getValue();
-                SettableFuture<List<String>> future = clientFutureMap.get(disconnectedClient);
-                if (future != null) {
-                    future.set(null);
-                }
-            }
+    public synchronized void onInactiveClient(OvsdbClient disconnectedClient) {
+        pendingClients.remove(disconnectedClient);
+        pendingClients.entrySet().stream().forEach(entry -> entry.getValue().remove(disconnectedClient));
+        Optional<OvsdbClient> clientOptional = pendingClients.entrySet().stream()
+                .filter(entry -> entry.getValue().isEmpty())
+                .map(entry -> entry.getKey())
+                .findFirst();
+        if (clientOptional.isPresent()) {
+            LOG.info("Sending notification for client {}", clientOptional.get().getConnectionInfo());
+            pendingClients.remove(clientOptional.get());
+            clientNotificationCallback.apply(clientOptional.get());
         }
     }
 
     @Override
     public void close() {
     }
-
-    private FutureCallback<List<String>> createStaleConnectionFutureCallback(
-            final OvsdbClient cbForClient, final OvsdbClient newClient,
-            final Map<OvsdbClient, SettableFuture<List<String>>> clientFutureMap,
-            final ScheduledFuture<?> echoTimeoutFuture) {
-        return new FutureCallback<List<String>>() {
-            @Override
-            public void onSuccess(List<String> result) {
-                // The future is removed from the 'clientFutureMap' when the onSuccess event for each future arrives
-                // If the map is empty we proceed with new connection process
-                clientFutureMap.remove(cbForClient);
-                if (clientFutureMap.isEmpty()) {
-                    if (!echoTimeoutFuture.isDone() && !echoTimeoutFuture.isCancelled()) {
-                        echoTimeoutFuture.cancel(true);
-                    }
-                    OvsdbConnectionService.notifyListenerForPassiveConnection(newClient);
-                    pendingConnectionClients.remove(newClient);
-                }
-            }
-
-            @Override
-            public void onFailure(Throwable throwable) {
-                if (!(throwable instanceof CancellationException)) {
-                    LOG.error("Error in checking stale connections)", throwable);
-                    return;
-                }
-
-                LOG.debug("Ping for {} was cancelled as a result of timeout. Assuming client connection is gone.",
-                                        cbForClient);
-
-                if (cbForClient.isActive()) {
-                    cbForClient.disconnect();
-                }
-
-                clientFutureMap.remove(cbForClient);
-
-                if (clientFutureMap.isEmpty()) {
-                    OvsdbConnectionService.notifyListenerForPassiveConnection(newClient);
-                    pendingConnectionClients.remove(newClient);
-                }
-            }
-        };
-    }
-}
+}
\ No newline at end of file
diff --git a/library/impl/src/test/java/org/opendaylight/ovsdb/lib/StalePassiveConnectionServiceTest.java b/library/impl/src/test/java/org/opendaylight/ovsdb/lib/StalePassiveConnectionServiceTest.java
new file mode 100644 (file)
index 0000000..a8c627f
--- /dev/null
@@ -0,0 +1,171 @@
+/*
+ * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.ovsdb.lib;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.stubbing.Answer;
+import org.opendaylight.ovsdb.lib.impl.StalePassiveConnectionService;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ StalePassiveConnectionService.class })
+public class StalePassiveConnectionServiceTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(StalePassiveConnectionService.class);
+    private static final String NOTIFIED = "NOTIFIED";
+    private Map<OvsdbClient, SettableFuture<String>> clientJobRunFutures = new HashMap<>();
+    private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
+    private StalePassiveConnectionService staleConnectionService
+            = new StalePassiveConnectionService((client) -> {
+                if (clientJobRunFutures.get(client) != null) {
+                    clientJobRunFutures.get(client).set(NOTIFIED);
+                }
+                return null;
+            });
+
+    private OvsdbClient firstClient = createClient("127.0.0.1", 8001);
+    private OvsdbClient secondClient = createClient("127.0.0.1", 8002);
+    private OvsdbClient thirdClient = createClient("127.0.0.1", 8003);
+
+    @Test
+    public void testFirstClientAlive() throws Exception {
+        staleConnectionService.handleNewPassiveConnection(secondClient, Lists.newArrayList(firstClient));
+        clientShouldNotBeNotified(secondClient, "Second client should not be processed while first client is active");
+        staleConnectionService.clientDisconnected(firstClient);
+        clientShouldBeNotified(secondClient, "Second client should be notified after first client disconnect");
+    }
+
+    @Test
+    public void testSecondClientDisconnect() throws Exception {
+        staleConnectionService.handleNewPassiveConnection(secondClient, Lists.newArrayList(firstClient));
+        clientShouldNotBeNotified(secondClient, "Second client should not be processed while first client is active");
+        staleConnectionService.clientDisconnected(secondClient);
+        clientShouldNotBeNotified(secondClient, "Second client should not be processed post its disconnect");
+        clientShouldBeClearedFromPendingList(secondClient, "Second client should be cleared from pending state");
+    }
+
+    @Test
+    public void testFirstClientInActive() throws Exception {
+        firstClient.echo();
+        when(firstClient.echo()).thenAnswer(delayedEchoFailedResponse(100, MILLISECONDS));
+        staleConnectionService.handleNewPassiveConnection(secondClient, Lists.newArrayList(firstClient));
+        clientShouldBeNotified(secondClient, "Second client should be notified after first client echo failed");
+        clientShouldBeClearedFromPendingList(secondClient, "Second client should be cleared from pending state");
+    }
+
+    @Test
+    public void testThreeClients() throws Exception {
+        staleConnectionService.handleNewPassiveConnection(secondClient, Lists.newArrayList(firstClient));
+        staleConnectionService.handleNewPassiveConnection(thirdClient, Lists.newArrayList(firstClient, secondClient));
+        clientShouldNotBeNotified(thirdClient, "Third client should not be processed while first client is there");
+        clientShouldNotBeNotified(secondClient, "Second client should not be processed while first client is there");
+
+        //disconnect first client
+        staleConnectionService.clientDisconnected(firstClient);
+        //now second client should be processed
+        clientShouldBeNotified(secondClient, "Second client should be notified after first client disconnected");
+        clientShouldNotBeNotified(thirdClient, "Third client should not be processed while second client is active");
+
+        //disconnect second client
+        staleConnectionService.clientDisconnected(secondClient);
+        //now third client should be processed
+        clientShouldBeNotified(secondClient, "Third client should be notified after second client also disconnected");
+    }
+
+    @Test
+    public void testDelayedFirstClientFailure() throws Exception {
+        /*
+        first client arrived
+        second client arrived
+        first client echo success
+        keep second client on wait list
+
+        third client arrived
+        second client echo success, first client echo failed
+        process second client //catch the moment first clients echo failed it is considered inactive
+
+        second client disconnected
+        process third client
+         */
+        staleConnectionService.handleNewPassiveConnection(secondClient, Lists.newArrayList(firstClient));
+
+        when(firstClient.echo()).thenAnswer(delayedEchoFailedResponse(100, MILLISECONDS));
+        staleConnectionService.handleNewPassiveConnection(thirdClient, Lists.newArrayList(firstClient, secondClient));
+        //now second client should be processed
+        clientShouldBeNotified(secondClient, "Second client should be processed post first client echo failed");
+        clientShouldNotBeNotified(thirdClient, "Third client should not be processed while second client is active");
+        //disconnect second client
+        staleConnectionService.clientDisconnected(secondClient);
+        //now third client should be processed
+        clientShouldBeNotified(thirdClient, "Third client should be processed post second client disconnect also");
+    }
+
+    private void clientShouldBeClearedFromPendingList(OvsdbClient client, String msg) {
+        assertTrue(msg, !staleConnectionService.getPendingClients().containsKey(client));
+    }
+
+    private void clientShouldBeNotified(OvsdbClient client, String msg)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        assertEquals(msg, clientJobRunFutures.get(client).get(1, SECONDS), NOTIFIED);
+        clientShouldBeClearedFromPendingList(client, "client should be cleared from pending state");
+    }
+
+    private void clientShouldNotBeNotified(OvsdbClient client, String msg)
+            throws ExecutionException, InterruptedException {
+        try {
+            clientJobRunFutures.get(client).get(1, SECONDS);
+            fail(msg);
+        } catch (TimeoutException e) {
+            LOG.trace("Expected exception");
+        }
+    }
+
+    private Answer<Object> delayedEchoResponse(int delay, TimeUnit timeUnit) {
+        return (mock) -> Futures.scheduleAsync(() -> Futures.immediateFuture(null),
+                delay, timeUnit, scheduledExecutorService);
+    }
+
+    private Answer<Object> delayedEchoFailedResponse(int delay, TimeUnit timeUnit) {
+        return (mock) -> Futures.scheduleAsync(() -> Futures.immediateFailedFuture(new RuntimeException("Echo failed")),
+                delay, timeUnit, scheduledExecutorService);
+    }
+
+    private OvsdbClient createClient(String ip, int port) {
+        OvsdbClient ovsdbClient = mock(OvsdbClient.class);
+        clientJobRunFutures.put(ovsdbClient, SettableFuture.create());
+        when(ovsdbClient.echo()).thenAnswer(delayedEchoResponse(100, MILLISECONDS));
+        OvsdbConnectionInfo connectionInfo = mock(OvsdbConnectionInfo.class);
+        when(connectionInfo.toString()).thenReturn("Host:" + ip + ",Port:" + port);
+        when(ovsdbClient.getConnectionInfo()).thenReturn(connectionInfo);
+        return ovsdbClient;
+    }
+}