Check disconnected flag on each operation 40/102740/5
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 18 Oct 2022 14:44:39 +0000 (16:44 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Tue, 18 Oct 2022 15:03:09 +0000 (17:03 +0200)
We are performing a staggered setup during which we may encounter an
asynchronous disconnect. We have an atomic flag guarding this, which is
always flipped before disconnect enters critical region.

Check the flag at each synchronized connect stage and abort connection
if it is observed as flipped -- ensuring we cannot accidentally signal
channel activation after the channel has been close()d.

Also ditch the AtomicBoolean and instead use a simple VarHandle, saving
a few bytes of overhead.

JIRA: NETCONF-905
Change-Id: I857b1768b083c13ac5060a803a9ba6d4008c5e4b
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java

index 5eaacb2d43fae05795cb788ba901dc39430d8cc6..98f049369b43f06c71479a10c43ca00a5a2b667d 100644 (file)
@@ -16,9 +16,10 @@ import io.netty.channel.ChannelPromise;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.FutureListener;
 import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
 import java.net.SocketAddress;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.checkerframework.checker.lock.qual.Holding;
 import org.eclipse.jdt.annotation.Nullable;
@@ -37,6 +38,15 @@ import org.slf4j.LoggerFactory;
  */
 public final class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
     private static final Logger LOG = LoggerFactory.getLogger(AsyncSshHandler.class);
+    private static final VarHandle DISCONNECTED;
+
+    static {
+        try {
+            DISCONNECTED = MethodHandles.lookup().findVarHandle(AsyncSshHandler.class, "disconnected", boolean.class);
+        } catch (NoSuchFieldException | IllegalAccessException e) {
+            throw new ExceptionInInitializerError(e);
+        }
+    }
 
     public static final String SUBSYSTEM = "netconf";
 
@@ -58,7 +68,6 @@ public final class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
         DEFAULT_CLIENT = c;
     }
 
-    private final AtomicBoolean isDisconnected = new AtomicBoolean();
     private final AuthenticationHandler authenticationHandler;
     private final Future<?> negotiationFuture;
     private final NetconfSshClient sshClient;
@@ -72,6 +81,8 @@ public final class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
     private ClientSession session;
     private FutureListener<Object> negotiationFutureListener;
 
+    private volatile boolean disconnected;
+
     public AsyncSshHandler(final AuthenticationHandler authenticationHandler, final NetconfSshClient sshClient,
             final Future<?> negotiationFuture) {
         this.authenticationHandler = requireNonNull(authenticationHandler);
@@ -168,6 +179,10 @@ public final class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
             onOpenFailure(ctx, new AuthenticationFailedException("Authentication failed", cause));
             return;
         }
+        if (disconnected) {
+            LOG.debug("Skipping SSH subsystem allocation, channel: {}", ctx.channel());
+            return;
+        }
 
         LOG.debug("SSH session authenticated on channel: {}, server version: {}", ctx.channel(),
             clientSession.getServerVersion());
@@ -194,6 +209,10 @@ public final class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
             onOpenFailure(ctx, cause);
             return;
         }
+        if (disconnected) {
+            LOG.trace("Skipping activation, channel: {}", ctx.channel());
+            return;
+        }
 
         LOG.trace("SSH subsystem channel opened successfully on channel: {}", ctx.channel());
         if (negotiationFuture == null) {
@@ -224,7 +243,7 @@ public final class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
 
     @Override
     public void disconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) {
-        if (isDisconnected.compareAndSet(false, true)) {
+        if (DISCONNECTED.compareAndSet(this, false, true)) {
             ctx.executor().execute(() -> safelyDisconnect(ctx, promise));
         }
     }