@Override
public void disconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) {
if (isDisconnected.compareAndSet(false, true)) {
- safelyDisconnect(ctx, promise);
+ ctx.executor().execute(() -> safelyDisconnect(ctx, promise));
}
}
+ // This method has the potential to interact with the channel pipeline, for example via fireChannelInactive(). These
+ // callbacks need to complete during execution of this method and therefore this method needs to be executing on
+ // the channel's executor.
@SuppressWarnings("checkstyle:IllegalCatch")
private synchronized void safelyDisconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) {
- LOG.trace("Closing SSH session on channel: {} with connect promise in state: {}",
- ctx.channel(), connectPromise);
+ LOG.trace("Closing SSH session on channel: {} with connect promise in state: {}", ctx.channel(),
+ connectPromise);
// If we have already succeeded and the session was dropped after,
// we need to fire inactive to notify reconnect logic
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
+import io.netty.util.concurrent.EventExecutor;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
private SocketAddress localAddress;
@Mock
private ChannelConfig channelConfig;
+ @Mock
+ private EventExecutor executor;
private AsyncSshHandler asyncSshHandler;
doReturn(ctx).when(ctx).fireChannelInactive();
doReturn(mock(ChannelFuture.class)).when(ctx).disconnect(any(ChannelPromise.class));
doReturn(getMockedPromise()).when(ctx).newPromise();
+ doReturn(executor).when(ctx).executor();
+ doAnswer(invocation -> {
+ invocation.getArgument(0, Runnable.class).run();
+ return null;
+ }).when(executor).execute(any());
}
private void stubChannel() {