import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import java.net.InetSocketAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@Deprecated
final class ReconnectPromise<S extends ProtocolSession<?>, L extends SessionListener<?, ?, ?>> extends DefaultPromise<Void> {
private static final Logger LOG = LoggerFactory.getLogger(ReconnectPromise.class);
pending = this.dispatcher.createClient(this.address, cs, b, new AbstractDispatcher.PipelineInitializer<S>() {
@Override
public void initializeChannel(final SocketChannel channel, final Promise<S> promise) {
+ initializer.initializeChannel(channel, promise);
// add closed channel handler
- // This handler has to be added before initializer.initializeChannel is called
- // Initializer might add some handlers using addFirst e.g. AsyncSshHandler and in that case
- // closed channel handler is before the handler that invokes channel inactive event
- channel.pipeline().addFirst(new ClosedChannelHandler(ReconnectPromise.this));
+ // This handler has to be added as last channel handler and the channel inactive event has to be caught by it
+ // Handlers in front of it can react to channelInactive event, but have to forward the event or the reconnect will not work
+ // This handler is last so all handlers in front of it can handle channel inactive (to e.g. resource cleanup) before a new connection is started
+ channel.pipeline().addLast(new ClosedChannelHandler(ReconnectPromise.this));
+ }
+ });
- initializer.initializeChannel(channel, promise);
+ pending.addListener(new GenericFutureListener<Future<Object>>() {
+ @Override
+ public void operationComplete(Future<Object> future) throws Exception {
+ if (!future.isSuccess()) {
+ ReconnectPromise.this.setFailure(future.cause());
+ }
}
});
}
/**
* Channel handler that responds to channelInactive event and reconnects the session.
- * Only if the initial connection was successfully established and promise was not canceled.
+ * Only if the promise was not canceled.
*/
private static final class ClosedChannelHandler extends ChannelInboundHandlerAdapter {
private final ReconnectPromise<?, ?> promise;
@Override
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
- // Pass info about disconnect further and then reconnect
- super.channelInactive(ctx);
-
+ // This is the ultimate channel inactive handler, not forwarding
if (promise.isCancelled()) {
return;
}
- // Check if initial connection was fully finished. If the session was dropped during negotiation, reconnect will not happen.
- // Session can be dropped during negotiation on purpose by the client side and would make no sense to initiate reconnect
if (promise.isInitialConnectFinished() == false) {
- return;
+ LOG.debug("Connection to {} was dropped during negotiation, reattempting", promise.address);
}
LOG.debug("Reconnecting after connection to {} was dropped", promise.address);