<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
- <dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("OVSDBConnNotifSer-%d").build());
private static final StalePassiveConnectionService STALE_PASSIVE_CONNECTION_SERVICE =
- new StalePassiveConnectionService(EXECUTOR_SERVICE);
+ new StalePassiveConnectionService((client) -> {
+ notifyListenerForPassiveConnection(client);
+ return null;
+ });
private static final Set<OvsdbConnectionListener> CONNECTION_LISTENERS = ConcurrentHashMap.newKeySet();
private static final Map<OvsdbClient, Channel> CONNECTIONS = new ConcurrentHashMap<>();
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.SettableFuture;
+
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.CancellationException;
+import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
import org.opendaylight.ovsdb.lib.OvsdbClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* @author Vinh Nguyen (vinh.nguyen@hcl.com) on 6/10/16.
*/
public class StalePassiveConnectionService implements AutoCloseable {
+
private static final Logger LOG = LoggerFactory.getLogger(StalePassiveConnectionService.class);
- private static Map<OvsdbClient, Map<OvsdbClient, SettableFuture<List<String>>>> pendingConnectionClients =
- new ConcurrentHashMap<>();
+ private final Map<OvsdbClient, Set<OvsdbClient>> pendingClients = new ConcurrentHashMap<>();
+ private final Function<OvsdbClient, Void> clientNotificationCallback;
- private final ScheduledExecutorService executorService;
- private static final int ECHO_TIMEOUT = 10;
+ public StalePassiveConnectionService(Function<OvsdbClient, Void> clientNotificationCallback) {
+ this.clientNotificationCallback = clientNotificationCallback;
+ }
- public StalePassiveConnectionService(final ScheduledExecutorService executorService) {
- this.executorService = executorService;
+ public Map<OvsdbClient, Set<OvsdbClient>> getPendingClients() {
+ return new HashMap(pendingClients);
}
/**
*/
public void handleNewPassiveConnection(final OvsdbClient newOvsdbClient,
final List<OvsdbClient> clientsFromSameNode) {
- final Map<OvsdbClient, SettableFuture<List<String>>> clientFutureMap = new ConcurrentHashMap<>();
- pendingConnectionClients.put(newOvsdbClient, clientFutureMap);
-
- // scheduled task for ping response timeout. Connections that don't response to the
- // ping or haven't disconnected after the timeout will be closed
- final ScheduledFuture<?> echoTimeoutFuture =
- executorService.schedule(() -> {
- for (Entry<OvsdbClient, SettableFuture<List<String>>> entry : clientFutureMap.entrySet()) {
- OvsdbClient client = entry.getKey();
- Future<?> clientFuture = entry.getValue();
- if (!clientFuture.isDone() && !clientFuture.isCancelled()) {
- clientFuture.cancel(true);
- }
- if (client.isActive()) {
- client.disconnect();
- }
+ LOG.info("Adding client to pending list {}", newOvsdbClient.getConnectionInfo());
+ pendingClients.put(newOvsdbClient, new HashSet<>());
+ /*
+ if old client echo succeeds
+ do not notify new client as it has to wait
+ else
+ if all old clients got disconnected/echo failed notify the new client
+ */
+ for (final OvsdbClient oldClient : clientsFromSameNode) {
+ pendingClients.get(newOvsdbClient).add(oldClient);
+ LOG.info("Echo testing client {}", oldClient.getConnectionInfo());
+ Futures.addCallback(oldClient.echo(),
+ new FutureCallback<List<String>>() {
+ @Override
+ public void onSuccess(List<String> result) {
+ //old client still active
+ LOG.info("Echo testing of old client {} succeded", oldClient.getConnectionInfo());
}
- }, ECHO_TIMEOUT, TimeUnit.SECONDS);
- // for every connection create a SettableFuture, save it to 'clientFutureMap', and send a ping (echo).
- // The ping results in either:
- // 1. ping response returns - the connection is active
- // 2. the netty connection is closed due to IO exception -
- // The future is removed from the 'clientFutureMap' when the onSuccess event for each future arrives
- // If the map is empty we proceed with new connection process
- for (final OvsdbClient client : clientsFromSameNode) {
- SettableFuture<List<String>> clientFuture = SettableFuture.create();
- clientFutureMap.put(client, clientFuture);
- Futures.addCallback(clientFuture,
- createStaleConnectionFutureCallback(client, newOvsdbClient, clientFutureMap, echoTimeoutFuture));
- Futures.addCallback(client.echo(),
- createStaleConnectionFutureCallback(client, newOvsdbClient, clientFutureMap, echoTimeoutFuture));
+ @Override
+ public void onFailure(Throwable throwable) {
+ LOG.info("Echo testing of old client {} failed", oldClient.getConnectionInfo());
+ onInactiveClient(oldClient);
+ }
+ });
}
}
* Notify the service that the given client has disconnected.
* @param disconnectedClient the client just disconnected
*/
- public void clientDisconnected(OvsdbClient disconnectedClient) {
- for (Entry<OvsdbClient, Map<OvsdbClient, SettableFuture<List<String>>>> entry :
- pendingConnectionClients.entrySet()) {
- OvsdbClient pendingClient = entry.getKey();
+ public synchronized void clientDisconnected(OvsdbClient disconnectedClient) {
+ LOG.info("Client disconnected {}", disconnectedClient.getConnectionInfo());
+ onInactiveClient(disconnectedClient);
+ }
- // set the future result for pending connections that wait for this client to be disconnected
- if (pendingClient.getConnectionInfo().getRemoteAddress()
- .equals(disconnectedClient.getConnectionInfo().getRemoteAddress())) {
- Map<OvsdbClient, SettableFuture<List<String>>> clientFutureMap = entry.getValue();
- SettableFuture<List<String>> future = clientFutureMap.get(disconnectedClient);
- if (future != null) {
- future.set(null);
- }
- }
+ public synchronized void onInactiveClient(OvsdbClient disconnectedClient) {
+ pendingClients.remove(disconnectedClient);
+ pendingClients.entrySet().stream().forEach(entry -> entry.getValue().remove(disconnectedClient));
+ Optional<OvsdbClient> clientOptional = pendingClients.entrySet().stream()
+ .filter(entry -> entry.getValue().isEmpty())
+ .map(entry -> entry.getKey())
+ .findFirst();
+ if (clientOptional.isPresent()) {
+ LOG.info("Sending notification for client {}", clientOptional.get().getConnectionInfo());
+ pendingClients.remove(clientOptional.get());
+ clientNotificationCallback.apply(clientOptional.get());
}
}
@Override
public void close() {
}
-
- private FutureCallback<List<String>> createStaleConnectionFutureCallback(
- final OvsdbClient cbForClient, final OvsdbClient newClient,
- final Map<OvsdbClient, SettableFuture<List<String>>> clientFutureMap,
- final ScheduledFuture<?> echoTimeoutFuture) {
- return new FutureCallback<List<String>>() {
- @Override
- public void onSuccess(List<String> result) {
- // The future is removed from the 'clientFutureMap' when the onSuccess event for each future arrives
- // If the map is empty we proceed with new connection process
- clientFutureMap.remove(cbForClient);
- if (clientFutureMap.isEmpty()) {
- if (!echoTimeoutFuture.isDone() && !echoTimeoutFuture.isCancelled()) {
- echoTimeoutFuture.cancel(true);
- }
- OvsdbConnectionService.notifyListenerForPassiveConnection(newClient);
- pendingConnectionClients.remove(newClient);
- }
- }
-
- @Override
- public void onFailure(Throwable throwable) {
- if (!(throwable instanceof CancellationException)) {
- LOG.error("Error in checking stale connections)", throwable);
- return;
- }
-
- LOG.debug("Ping for {} was cancelled as a result of timeout. Assuming client connection is gone.",
- cbForClient);
-
- if (cbForClient.isActive()) {
- cbForClient.disconnect();
- }
-
- clientFutureMap.remove(cbForClient);
-
- if (clientFutureMap.isEmpty()) {
- OvsdbConnectionService.notifyListenerForPassiveConnection(newClient);
- pendingConnectionClients.remove(newClient);
- }
- }
- };
- }
-}
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2018 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.ovsdb.lib;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.stubbing.Answer;
+import org.opendaylight.ovsdb.lib.impl.StalePassiveConnectionService;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ StalePassiveConnectionService.class })
+public class StalePassiveConnectionServiceTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StalePassiveConnectionService.class);
+ private static final String NOTIFIED = "NOTIFIED";
+ private Map<OvsdbClient, SettableFuture<String>> clientJobRunFutures = new HashMap<>();
+ private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
+ private StalePassiveConnectionService staleConnectionService
+ = new StalePassiveConnectionService((client) -> {
+ if (clientJobRunFutures.get(client) != null) {
+ clientJobRunFutures.get(client).set(NOTIFIED);
+ }
+ return null;
+ });
+
+ private OvsdbClient firstClient = createClient("127.0.0.1", 8001);
+ private OvsdbClient secondClient = createClient("127.0.0.1", 8002);
+ private OvsdbClient thirdClient = createClient("127.0.0.1", 8003);
+
+ @Test
+ public void testFirstClientAlive() throws Exception {
+ staleConnectionService.handleNewPassiveConnection(secondClient, Lists.newArrayList(firstClient));
+ clientShouldNotBeNotified(secondClient, "Second client should not be processed while first client is active");
+ staleConnectionService.clientDisconnected(firstClient);
+ clientShouldBeNotified(secondClient, "Second client should be notified after first client disconnect");
+ }
+
+ @Test
+ public void testSecondClientDisconnect() throws Exception {
+ staleConnectionService.handleNewPassiveConnection(secondClient, Lists.newArrayList(firstClient));
+ clientShouldNotBeNotified(secondClient, "Second client should not be processed while first client is active");
+ staleConnectionService.clientDisconnected(secondClient);
+ clientShouldNotBeNotified(secondClient, "Second client should not be processed post its disconnect");
+ clientShouldBeClearedFromPendingList(secondClient, "Second client should be cleared from pending state");
+ }
+
+ @Test
+ public void testFirstClientInActive() throws Exception {
+ firstClient.echo();
+ when(firstClient.echo()).thenAnswer(delayedEchoFailedResponse(100, MILLISECONDS));
+ staleConnectionService.handleNewPassiveConnection(secondClient, Lists.newArrayList(firstClient));
+ clientShouldBeNotified(secondClient, "Second client should be notified after first client echo failed");
+ clientShouldBeClearedFromPendingList(secondClient, "Second client should be cleared from pending state");
+ }
+
+ @Test
+ public void testThreeClients() throws Exception {
+ staleConnectionService.handleNewPassiveConnection(secondClient, Lists.newArrayList(firstClient));
+ staleConnectionService.handleNewPassiveConnection(thirdClient, Lists.newArrayList(firstClient, secondClient));
+ clientShouldNotBeNotified(thirdClient, "Third client should not be processed while first client is there");
+ clientShouldNotBeNotified(secondClient, "Second client should not be processed while first client is there");
+
+ //disconnect first client
+ staleConnectionService.clientDisconnected(firstClient);
+ //now second client should be processed
+ clientShouldBeNotified(secondClient, "Second client should be notified after first client disconnected");
+ clientShouldNotBeNotified(thirdClient, "Third client should not be processed while second client is active");
+
+ //disconnect second client
+ staleConnectionService.clientDisconnected(secondClient);
+ //now third client should be processed
+ clientShouldBeNotified(secondClient, "Third client should be notified after second client also disconnected");
+ }
+
+ @Test
+ public void testDelayedFirstClientFailure() throws Exception {
+ /*
+ first client arrived
+ second client arrived
+ first client echo success
+ keep second client on wait list
+
+ third client arrived
+ second client echo success, first client echo failed
+ process second client //catch the moment first clients echo failed it is considered inactive
+
+ second client disconnected
+ process third client
+ */
+ staleConnectionService.handleNewPassiveConnection(secondClient, Lists.newArrayList(firstClient));
+
+ when(firstClient.echo()).thenAnswer(delayedEchoFailedResponse(100, MILLISECONDS));
+ staleConnectionService.handleNewPassiveConnection(thirdClient, Lists.newArrayList(firstClient, secondClient));
+ //now second client should be processed
+ clientShouldBeNotified(secondClient, "Second client should be processed post first client echo failed");
+ clientShouldNotBeNotified(thirdClient, "Third client should not be processed while second client is active");
+ //disconnect second client
+ staleConnectionService.clientDisconnected(secondClient);
+ //now third client should be processed
+ clientShouldBeNotified(thirdClient, "Third client should be processed post second client disconnect also");
+ }
+
+ private void clientShouldBeClearedFromPendingList(OvsdbClient client, String msg) {
+ assertTrue(msg, !staleConnectionService.getPendingClients().containsKey(client));
+ }
+
+ private void clientShouldBeNotified(OvsdbClient client, String msg)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ assertEquals(msg, clientJobRunFutures.get(client).get(1, SECONDS), NOTIFIED);
+ clientShouldBeClearedFromPendingList(client, "client should be cleared from pending state");
+ }
+
+ private void clientShouldNotBeNotified(OvsdbClient client, String msg)
+ throws ExecutionException, InterruptedException {
+ try {
+ clientJobRunFutures.get(client).get(1, SECONDS);
+ fail(msg);
+ } catch (TimeoutException e) {
+ LOG.trace("Expected exception");
+ }
+ }
+
+ private Answer<Object> delayedEchoResponse(int delay, TimeUnit timeUnit) {
+ return (mock) -> Futures.scheduleAsync(() -> Futures.immediateFuture(null),
+ delay, timeUnit, scheduledExecutorService);
+ }
+
+ private Answer<Object> delayedEchoFailedResponse(int delay, TimeUnit timeUnit) {
+ return (mock) -> Futures.scheduleAsync(() -> Futures.immediateFailedFuture(new RuntimeException("Echo failed")),
+ delay, timeUnit, scheduledExecutorService);
+ }
+
+ private OvsdbClient createClient(String ip, int port) {
+ OvsdbClient ovsdbClient = mock(OvsdbClient.class);
+ clientJobRunFutures.put(ovsdbClient, SettableFuture.create());
+ when(ovsdbClient.echo()).thenAnswer(delayedEchoResponse(100, MILLISECONDS));
+ OvsdbConnectionInfo connectionInfo = mock(OvsdbConnectionInfo.class);
+ when(connectionInfo.toString()).thenReturn("Host:" + ip + ",Port:" + port);
+ when(ovsdbClient.getConnectionInfo()).thenReturn(connectionInfo);
+ return ovsdbClient;
+ }
+}