/* * Copyright (c) 2016 , NEC Corporation and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.ovsdb.lib.impl; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.opendaylight.ovsdb.lib.OvsdbClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * StalePassiveConnectionService provides functionalities to clean up stale passive connections * from the same node before new connection request arrives, especially for connection flapping scenarios. * *

When new connection arrives all connections from the same node are pinged. The pings cause * the stale netty connections to close due to IOException. Those have not been closed after a timeout * will be closed programmatically. New connection request handling is then proceeded after all * stale connections are cleaned up in the OvsdbConnectionService * * @author Vinh Nguyen (vinh.nguyen@hcl.com) on 6/10/16. */ public class StalePassiveConnectionService implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(StalePassiveConnectionService.class); private static Map> pendingConnectionClients = new ConcurrentHashMap<>(); private final ScheduledExecutorService executorService; private static final int ECHO_TIMEOUT = 10; public StalePassiveConnectionService(final ScheduledExecutorService executorService) { this.executorService = executorService; } /** * This method makes sure that all stale connections from the same node are properly cleaned up before processing * new connection request. * * @param newOvsdbClient the connecting OvsdbClient * @param clientsFromSameNode list of existing OvsdbClients from the same node as the new OvsdbClient */ public void handleNewPassiveConnection(final OvsdbClient newOvsdbClient, final List clientsFromSameNode) { final Map clientFutureMap = new ConcurrentHashMap<>(); pendingConnectionClients.put(newOvsdbClient, clientFutureMap); // scheduled task for ping response timeout. Connections that don't response to the // ping or haven't disconnected after the timeout will be closed final ScheduledFuture echoTimeoutFuture = executorService.schedule(new Runnable() { @Override public void run() { for (OvsdbClient client : clientFutureMap.keySet()) { Future clientFuture = clientFutureMap.get(client); if ( !clientFuture.isDone() && !clientFuture.isCancelled()) { clientFuture.cancel(true); } if (client.isActive()) { client.disconnect(); } } } }, ECHO_TIMEOUT, TimeUnit.SECONDS); // for every connection create a SettableFuture, save it to 'clientFutureMap', and send a ping (echo). // The ping results in either: // 1. ping response returns - the connection is active // 2. the netty connection is closed due to IO exception - // The future is removed from the 'clientFutureMap' when the onSuccess event for each future arrives // If the map is empty we proceed with new connection process for (final OvsdbClient client : clientsFromSameNode) { SettableFuture clientFuture = SettableFuture.create(); clientFutureMap.put(client, clientFuture); Futures.addCallback(clientFuture, createStaleConnectionFutureCallback(client, newOvsdbClient, clientFutureMap, echoTimeoutFuture)); Futures.addCallback(client.echo(), createStaleConnectionFutureCallback(client, newOvsdbClient, clientFutureMap, echoTimeoutFuture)); } } /** * Notify the service that the given client has disconnected. * @param disconnectedClient the client just disconnected */ public void clientDisconnected(OvsdbClient disconnectedClient) { for (OvsdbClient pendingClient : pendingConnectionClients.keySet()) { // set the future result for pending connections that wait for this client to be disconnected if (pendingClient.getConnectionInfo().getRemoteAddress() .equals(disconnectedClient.getConnectionInfo().getRemoteAddress())) { Map clientFutureMap = pendingConnectionClients.get(pendingClient); if (clientFutureMap.containsKey(disconnectedClient)) { clientFutureMap.get(disconnectedClient).set(null); } } } } @Override public void close() { } private FutureCallback> createStaleConnectionFutureCallback( final OvsdbClient cbForClient, final OvsdbClient newClient, final Map clientFutureMap, final ScheduledFuture echoTimeoutFuture) { return new FutureCallback>() { @Override public void onSuccess(List result) { // The future is removed from the 'clientFutureMap' when the onSuccess event for each future arrives // If the map is empty we proceed with new connection process clientFutureMap.remove(cbForClient); if (clientFutureMap.isEmpty()) { if (!echoTimeoutFuture.isDone() && !echoTimeoutFuture.isCancelled()) { echoTimeoutFuture.cancel(true); } OvsdbConnectionService.notifyListenerForPassiveConnection(newClient); pendingConnectionClients.remove(newClient); } } @Override public void onFailure(Throwable throwable) { LOG.error("Error in checking stale connections)", throwable); } }; } }