import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
import org.opendaylight.ovsdb.lib.OvsdbClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* @author Vinh Nguyen (vinh.nguyen@hcl.com) on 6/10/16.
*/
public class StalePassiveConnectionService implements AutoCloseable {
+
private static final Logger LOG = LoggerFactory.getLogger(StalePassiveConnectionService.class);
- private static Map<OvsdbClient, Map<OvsdbClient, SettableFuture<List<String>>>> pendingConnectionClients =
- new ConcurrentHashMap<>();
+ private final Map<OvsdbClient, Set<OvsdbClient>> pendingClients = new ConcurrentHashMap<>();
+ private final Function<OvsdbClient, Void> clientNotificationCallback;
- private final ScheduledExecutorService executorService;
- private static final int ECHO_TIMEOUT = 10;
+ public StalePassiveConnectionService(Function<OvsdbClient, Void> clientNotificationCallback) {
+ this.clientNotificationCallback = clientNotificationCallback;
+ }
- public StalePassiveConnectionService(final ScheduledExecutorService executorService) {
- this.executorService = executorService;
+ public Map<OvsdbClient, Set<OvsdbClient>> getPendingClients() {
+ return new HashMap<>(pendingClients);
}
/**
*/
public void handleNewPassiveConnection(final OvsdbClient newOvsdbClient,
final List<OvsdbClient> clientsFromSameNode) {
- final Map<OvsdbClient, SettableFuture<List<String>>> clientFutureMap = new ConcurrentHashMap<>();
- pendingConnectionClients.put(newOvsdbClient, clientFutureMap);
-
- // scheduled task for ping response timeout. Connections that don't response to the
- // ping or haven't disconnected after the timeout will be closed
- final ScheduledFuture<?> echoTimeoutFuture =
- executorService.schedule(() -> {
- for (Entry<OvsdbClient, SettableFuture<List<String>>> entry : clientFutureMap.entrySet()) {
- OvsdbClient client = entry.getKey();
- Future<?> clientFuture = entry.getValue();
- if (!clientFuture.isDone() && !clientFuture.isCancelled()) {
- clientFuture.cancel(true);
- }
- if (client.isActive()) {
- client.disconnect();
- }
+ LOG.info("Adding client to pending list {}", newOvsdbClient.getConnectionInfo());
+ pendingClients.put(newOvsdbClient, new HashSet<>());
+ /*
+ if old client echo succeeds
+ do not notify new client as it has to wait
+ else
+ if all old clients got disconnected/echo failed notify the new client
+ */
+ for (final OvsdbClient oldClient : clientsFromSameNode) {
+ pendingClients.get(newOvsdbClient).add(oldClient);
+ LOG.info("Echo testing client {}", oldClient.getConnectionInfo());
+ Futures.addCallback(oldClient.echo(),
+ new FutureCallback<List<String>>() {
+ @Override
+ public void onSuccess(List<String> result) {
+ //old client still active
+ LOG.info("Echo testing of old client {} succeeded", oldClient.getConnectionInfo());
}
- }, ECHO_TIMEOUT, TimeUnit.SECONDS);
- // for every connection create a SettableFuture, save it to 'clientFutureMap', and send a ping (echo).
- // The ping results in either:
- // 1. ping response returns - the connection is active
- // 2. the netty connection is closed due to IO exception -
- // The future is removed from the 'clientFutureMap' when the onSuccess event for each future arrives
- // If the map is empty we proceed with new connection process
- for (final OvsdbClient client : clientsFromSameNode) {
- SettableFuture<List<String>> clientFuture = SettableFuture.create();
- clientFutureMap.put(client, clientFuture);
- Futures.addCallback(clientFuture,
- createStaleConnectionFutureCallback(client, newOvsdbClient, clientFutureMap, echoTimeoutFuture));
- Futures.addCallback(client.echo(),
- createStaleConnectionFutureCallback(client, newOvsdbClient, clientFutureMap, echoTimeoutFuture));
+ @Override
+ public void onFailure(Throwable throwable) {
+ LOG.info("Echo testing of old client {} failed, disconnect and notify clients",
+ oldClient.getConnectionInfo());
+ //disconnect the old client to cleanup, so that new connection can proceed
+ oldClient.disconnect();
+ onInactiveClient(oldClient);
+ }
+ }, MoreExecutors.directExecutor());
}
}
* Notify the service that the given client has disconnected.
* @param disconnectedClient the client just disconnected
*/
- public void clientDisconnected(OvsdbClient disconnectedClient) {
- for (Entry<OvsdbClient, Map<OvsdbClient, SettableFuture<List<String>>>> entry :
- pendingConnectionClients.entrySet()) {
- OvsdbClient pendingClient = entry.getKey();
+ public synchronized void clientDisconnected(OvsdbClient disconnectedClient) {
+ LOG.info("Client disconnected {}", disconnectedClient.getConnectionInfo());
+ onInactiveClient(disconnectedClient);
+ }
- // set the future result for pending connections that wait for this client to be disconnected
- if (pendingClient.getConnectionInfo().getRemoteAddress()
- .equals(disconnectedClient.getConnectionInfo().getRemoteAddress())) {
- Map<OvsdbClient, SettableFuture<List<String>>> clientFutureMap = entry.getValue();
- SettableFuture<List<String>> future = clientFutureMap.get(disconnectedClient);
- if (future != null) {
- future.set(null);
- }
- }
+ public synchronized void onInactiveClient(OvsdbClient disconnectedClient) {
+ pendingClients.remove(disconnectedClient);
+ pendingClients.entrySet().stream().forEach(entry -> entry.getValue().remove(disconnectedClient));
+ Optional<OvsdbClient> clientOptional = pendingClients.entrySet().stream()
+ .filter(entry -> entry.getValue().isEmpty())
+ .map(entry -> entry.getKey())
+ .findFirst();
+ if (clientOptional.isPresent()) {
+ LOG.info("Sending notification for client {}", clientOptional.get().getConnectionInfo());
+ pendingClients.remove(clientOptional.get());
+ clientNotificationCallback.apply(clientOptional.get());
}
}
@Override
public void close() {
}
-
- private FutureCallback<List<String>> createStaleConnectionFutureCallback(
- final OvsdbClient cbForClient, final OvsdbClient newClient,
- final Map<OvsdbClient, SettableFuture<List<String>>> clientFutureMap,
- final ScheduledFuture<?> echoTimeoutFuture) {
- return new FutureCallback<List<String>>() {
- @Override
- public void onSuccess(List<String> result) {
- // The future is removed from the 'clientFutureMap' when the onSuccess event for each future arrives
- // If the map is empty we proceed with new connection process
- clientFutureMap.remove(cbForClient);
- if (clientFutureMap.isEmpty()) {
- if (!echoTimeoutFuture.isDone() && !echoTimeoutFuture.isCancelled()) {
- echoTimeoutFuture.cancel(true);
- }
- OvsdbConnectionService.notifyListenerForPassiveConnection(newClient);
- pendingConnectionClients.remove(newClient);
- }
- }
-
- @Override
- public void onFailure(Throwable throwable) {
- LOG.error("Error in checking stale connections)", throwable);
- }
- };
- }
}