import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.util.concurrent.FutureListener;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
private NettyAwareChannelSubsystem channel;
private ClientSession session;
private ChannelPromise connectPromise;
- private GenericFutureListener negotiationFutureListener;
+ private FutureListener<Object> negotiationFutureListener;
public AsyncSshHandler(final AuthenticationHandler authenticationHandler, final NetconfSshClient sshClient,
final Future<?> negotiationFuture) {
negotiationFuture);
}
- private void startSsh(final ChannelHandlerContext ctx, final SocketAddress address) throws IOException {
- LOG.debug("Starting SSH to {} on channel: {}", address, ctx.channel());
-
- final ConnectFuture sshConnectionFuture = sshClient.connect(authenticationHandler.getUsername(), address)
- .verify(ctx.channel().config().getConnectTimeoutMillis(), TimeUnit.MILLISECONDS);
- sshConnectionFuture.addListener(future -> {
- if (future.isConnected()) {
- handleSshSessionCreated(future, ctx);
- } else {
- handleSshSetupFailure(ctx, future.getException());
- }
- });
- }
-
private synchronized void handleSshSessionCreated(final ConnectFuture future, final ChannelHandlerContext ctx) {
try {
LOG.trace("SSH session created on channel: {}", ctx.channel());
private synchronized void handleSshAuthenticated(final NettyAwareClientSession newSession,
final ChannelHandlerContext ctx) {
- try {
- LOG.debug("SSH session authenticated on channel: {}, server version: {}", ctx.channel(),
- newSession.getServerVersion());
+ LOG.debug("SSH session authenticated on channel: {}, server version: {}", ctx.channel(),
+ newSession.getServerVersion());
+ try {
channel = newSession.createSubsystemChannel(SUBSYSTEM, ctx);
channel.setStreaming(ClientChannel.Streaming.Async);
channel.open().addListener(future -> {
handleSshSetupFailure(ctx, future.getException());
}
});
-
-
} catch (final IOException e) {
handleSshSetupFailure(ctx, e);
}
@Override
public synchronized void connect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress,
- final SocketAddress localAddress, final ChannelPromise promise) throws Exception {
- LOG.debug("SSH session connecting on channel {}. promise: {} ", ctx.channel(), connectPromise);
- this.connectPromise = promise;
+ final SocketAddress localAddress, final ChannelPromise promise) throws IOException {
+ LOG.debug("SSH session connecting on channel {}. promise: {}", ctx.channel(), promise);
+ connectPromise = requireNonNull(promise);
if (negotiationFuture != null) {
negotiationFutureListener = future -> {
//complete connection promise with netconf negotiation future
negotiationFuture.addListener(negotiationFutureListener);
}
- startSsh(ctx, remoteAddress);
+
+ LOG.debug("Starting SSH to {} on channel: {}", remoteAddress, ctx.channel());
+ sshClient.connect(authenticationHandler.getUsername(), remoteAddress)
+ // FIXME: this is a blocking call, we should handle this with a concurrently-scheduled timeout. We do not
+ // have a Timer ready, so perhaps we should be using the event loop?
+ .verify(ctx.channel().config().getConnectTimeoutMillis(), TimeUnit.MILLISECONDS)
+ .addListener(future -> onConnectComplete(future, ctx));
+ }
+
+ private void onConnectComplete(final ConnectFuture future, final ChannelHandlerContext ctx) {
+ final var cause = future.getException();
+ if (cause != null) {
+ handleSshSetupFailure(ctx, cause);
+ return;
+ }
+
+ handleSshSessionCreated(future, ctx);
}
@Override