summary |
shortlog |
log |
commit | commitdiff |
review |
tree
raw |
patch |
inline | side by side (from parent 1:
1b4958b)
The disconnect() operation needs to inform handlers of state
transitions, which should not be delayed. Netty provides indirects these
calls silently on thread mismatch, which we do not want.
Make sure to schedule safelyDisconnect() on the event loop, so that that
it cannot run concurrently with other channel tasks.
JIRA: NETCONF-905
Change-Id: Iffe98db142f9c407fca9f92e5d336a0484ef1eff
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
@Override
public void disconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) {
if (isDisconnected.compareAndSet(false, true)) {
@Override
public void disconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) {
if (isDisconnected.compareAndSet(false, true)) {
- safelyDisconnect(ctx, promise);
+ ctx.executor().execute(() -> safelyDisconnect(ctx, promise));
+ // This method has the potential to interact with the channel pipeline, for example via fireChannelInactive(). These
+ // callbacks need to complete during execution of this method and therefore this method needs to be executing on
+ // the channel's executor.
@SuppressWarnings("checkstyle:IllegalCatch")
private synchronized void safelyDisconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) {
@SuppressWarnings("checkstyle:IllegalCatch")
private synchronized void safelyDisconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) {
- LOG.trace("Closing SSH session on channel: {} with connect promise in state: {}",
- ctx.channel(), connectPromise);
+ LOG.trace("Closing SSH session on channel: {} with connect promise in state: {}", ctx.channel(),
+ connectPromise);
// If we have already succeeded and the session was dropped after,
// we need to fire inactive to notify reconnect logic
// If we have already succeeded and the session was dropped after,
// we need to fire inactive to notify reconnect logic
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
+import io.netty.util.concurrent.EventExecutor;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
private SocketAddress localAddress;
@Mock
private ChannelConfig channelConfig;
private SocketAddress localAddress;
@Mock
private ChannelConfig channelConfig;
+ @Mock
+ private EventExecutor executor;
private AsyncSshHandler asyncSshHandler;
private AsyncSshHandler asyncSshHandler;
doReturn(ctx).when(ctx).fireChannelInactive();
doReturn(mock(ChannelFuture.class)).when(ctx).disconnect(any(ChannelPromise.class));
doReturn(getMockedPromise()).when(ctx).newPromise();
doReturn(ctx).when(ctx).fireChannelInactive();
doReturn(mock(ChannelFuture.class)).when(ctx).disconnect(any(ChannelPromise.class));
doReturn(getMockedPromise()).when(ctx).newPromise();
+ doReturn(executor).when(ctx).executor();
+ doAnswer(invocation -> {
+ invocation.getArgument(0, Runnable.class).run();
+ return null;
+ }).when(executor).execute(any());
}
private void stubChannel() {
}
private void stubChannel() {