pending = this.dispatcher.createClient(this.address, cs, b, new AbstractDispatcher.PipelineInitializer<S>() {
@Override
public void initializeChannel(final SocketChannel channel, final Promise<S> promise) {
- initializer.initializeChannel(channel, promise);
-
// add closed channel handler
+ // This handler has to be added before initializer.initializeChannel is called
+ // Initializer might add some handlers using addFirst e.g. AsyncSshHandler and in that case
+ // closed channel handler is before the handler that invokes channel inactive event
channel.pipeline().addFirst(new ClosedChannelHandler(ReconnectPromise.this));
+
+ initializer.initializeChannel(channel, promise);
}
});
}
/**
* Channel handler that responds to channelInactive event and reconnects the session.
- * Only if the initial connection was successfully established and promise was not canceled.
+ * Only if the promise was not canceled.
*/
private static final class ClosedChannelHandler extends ChannelInboundHandlerAdapter {
private final ReconnectPromise<?, ?> promise;
@Override
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
+ // Pass info about disconnect further and then reconnect
+ super.channelInactive(ctx);
+
if (promise.isCancelled()) {
return;
}
- // Check if initial connection was fully finished. If the session was dropped during negotiation, reconnect will not happen.
- // Session can be dropped during negotiation on purpose by the client side and would make no sense to initiate reconnect
if (promise.isInitialConnectFinished() == false) {
- return;
+ LOG.debug("Connection to {} was dropped during negotiation, reattempting", promise.address);
}
LOG.debug("Reconnecting after connection to {} was dropped", promise.address);