Run safelyDisconnect() on event loop 21/102721/5
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 18 Oct 2022 09:10:15 +0000 (11:10 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Tue, 18 Oct 2022 11:57:25 +0000 (13:57 +0200)
The disconnect() operation needs to inform handlers of state
transitions, which should not be delayed. Netty provides indirects these
calls silently on thread mismatch, which we do not want.

Make sure to schedule safelyDisconnect() on the event loop, so that that
it cannot run concurrently with other channel tasks.

JIRA: NETCONF-905
Change-Id: Iffe98db142f9c407fca9f92e5d336a0484ef1eff
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java
netconf/netconf-netty-util/src/test/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerTest.java

index c8312e9419af95fe90a91c8130f35c74319a0ff1..fa4796c48f8b68a0aa6a1b964faf2c9bcc2a54fd 100644 (file)
@@ -230,14 +230,17 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
     @Override
     public void disconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) {
         if (isDisconnected.compareAndSet(false, true)) {
-            safelyDisconnect(ctx, promise);
+            ctx.executor().execute(() -> safelyDisconnect(ctx, promise));
         }
     }
 
+    // This method has the potential to interact with the channel pipeline, for example via fireChannelInactive(). These
+    // callbacks need to complete during execution of this method and therefore this method needs to be executing on
+    // the channel's executor.
     @SuppressWarnings("checkstyle:IllegalCatch")
     private synchronized void safelyDisconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) {
-        LOG.trace("Closing SSH session on channel: {} with connect promise in state: {}",
-                ctx.channel(), connectPromise);
+        LOG.trace("Closing SSH session on channel: {} with connect promise in state: {}", ctx.channel(),
+            connectPromise);
 
         // If we have already succeeded and the session was dropped after,
         // we need to fire inactive to notify reconnect logic
index fb2fd690f9990be6dd050fb0d6f1bd76b9f9d078..b3f085b2930d83507eebddde3e26e877400c964d 100644 (file)
@@ -32,6 +32,7 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.DefaultChannelPromise;
+import io.netty.util.concurrent.EventExecutor;
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.util.concurrent.TimeUnit;
@@ -76,6 +77,8 @@ public class AsyncSshHandlerTest {
     private SocketAddress localAddress;
     @Mock
     private ChannelConfig channelConfig;
+    @Mock
+    private EventExecutor executor;
 
     private AsyncSshHandler asyncSshHandler;
 
@@ -136,6 +139,11 @@ public class AsyncSshHandlerTest {
         doReturn(ctx).when(ctx).fireChannelInactive();
         doReturn(mock(ChannelFuture.class)).when(ctx).disconnect(any(ChannelPromise.class));
         doReturn(getMockedPromise()).when(ctx).newPromise();
+        doReturn(executor).when(ctx).executor();
+        doAnswer(invocation -> {
+            invocation.getArgument(0, Runnable.class).run();
+            return null;
+        }).when(executor).execute(any());
     }
 
     private void stubChannel() {