import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
import java.net.SocketAddress;
+import java.time.Duration;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.checkerframework.checker.lock.qual.Holding;
import org.eclipse.jdt.annotation.Nullable;
/**
* Netty SSH handler class. Acts as interface between Netty and SSH library.
*/
-public class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
+public final class AsyncSshHandler extends ChannelOutboundHandlerAdapter {
private static final Logger LOG = LoggerFactory.getLogger(AsyncSshHandler.class);
+ private static final VarHandle DISCONNECTED;
+
+ static {
+ try {
+ DISCONNECTED = MethodHandles.lookup().findVarHandle(AsyncSshHandler.class, "disconnected", boolean.class);
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ throw new ExceptionInInitializerError(e);
+ }
+ }
public static final String SUBSYSTEM = "netconf";
public static final NetconfSshClient DEFAULT_CLIENT;
static {
- final NetconfSshClient c = new NetconfClientBuilder().build();
+ final var c = new NetconfClientBuilder().build();
// Disable default timeouts from mina sshd
- c.getProperties().put(CoreModuleProperties.AUTH_TIMEOUT.getName(), "0");
- c.getProperties().put(CoreModuleProperties.IDLE_TIMEOUT.getName(), "0");
- c.getProperties().put(CoreModuleProperties.NIO2_READ_TIMEOUT.getName(), "0");
- c.getProperties().put(CoreModuleProperties.TCP_NODELAY.getName(), true);
+ final var zero = Duration.ofMillis(0);
+ CoreModuleProperties.AUTH_TIMEOUT.set(c, zero);
+ CoreModuleProperties.IDLE_TIMEOUT.set(c, zero);
+ CoreModuleProperties.NIO2_READ_TIMEOUT.set(c, zero);
+ CoreModuleProperties.TCP_NODELAY.set(c, true);
// TODO make configurable, or somehow reuse netty threadpool
c.setNioWorkers(SSH_DEFAULT_NIO_WORKERS);
DEFAULT_CLIENT = c;
}
- private final AtomicBoolean isDisconnected = new AtomicBoolean();
private final AuthenticationHandler authenticationHandler;
private final Future<?> negotiationFuture;
private final NetconfSshClient sshClient;
private ClientSession session;
private FutureListener<Object> negotiationFutureListener;
+ private volatile boolean disconnected;
+
public AsyncSshHandler(final AuthenticationHandler authenticationHandler, final NetconfSshClient sshClient,
final Future<?> negotiationFuture) {
this.authenticationHandler = requireNonNull(authenticationHandler);
onOpenFailure(ctx, new AuthenticationFailedException("Authentication failed", cause));
return;
}
+ if (disconnected) {
+ LOG.debug("Skipping SSH subsystem allocation, channel: {}", ctx.channel());
+ return;
+ }
LOG.debug("SSH session authenticated on channel: {}, server version: {}", ctx.channel(),
clientSession.getServerVersion());
return;
}
- openFuture.addListener(future -> onOpenComplete(future, ctx));
+ openFuture.addListener(future -> ctx.executor().execute(() -> onOpenComplete(future, ctx)));
}
+ // This callback has to run on the channel's executor because it runs fireChannelActive(), which needs to be
+ // delivered synchronously. If we were to execute on some other thread we would end up delaying the event,
+ // potentially creating havoc in the pipeline.
private synchronized void onOpenComplete(final OpenFuture openFuture, final ChannelHandlerContext ctx) {
final var cause = openFuture.getException();
if (cause != null) {
onOpenFailure(ctx, cause);
return;
}
+ if (disconnected) {
+ LOG.trace("Skipping activation, channel: {}", ctx.channel());
+ return;
+ }
LOG.trace("SSH subsystem channel opened successfully on channel: {}", ctx.channel());
if (negotiationFuture == null) {
@Override
public void disconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) {
- if (isDisconnected.compareAndSet(false, true)) {
+ if (DISCONNECTED.compareAndSet(this, false, true)) {
ctx.executor().execute(() -> safelyDisconnect(ctx, promise));
}
}