X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fnetconf-netty-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Fnettyutil%2Fhandler%2Fssh%2Fclient%2FAsyncSshHandler.java;h=14d753f1f8b59c25a7ea871e3bfe5e430c1d9970;hp=3bd72320232bb7f912a316469b816fa76952f0c4;hb=a2563a94253f9c2603e0ab25b8f412ea07fcf51d;hpb=879a57936375ca3dec48c5bf52b0b5988c807bae diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java index 3bd7232023..14d753f1f8 100644 --- a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client; import com.google.common.base.Preconditions; +import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; @@ -31,7 +32,7 @@ import org.slf4j.LoggerFactory; */ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { - private static final Logger logger = LoggerFactory.getLogger(AsyncSshHandler.class); + private static final Logger LOG = LoggerFactory.getLogger(AsyncSshHandler.class); public static final String SUBSYSTEM = "netconf"; public static final SshClient DEFAULT_CLIENT = SshClient.setUpDefaultClient(); @@ -47,7 +48,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { private final AuthenticationHandler authenticationHandler; private final SshClient sshClient; - private AsyncSshHanderReader sshReadAsyncListener; + private AsyncSshHandlerReader sshReadAsyncListener; private AsyncSshHandlerWriter sshWriteAsyncHandler; private ClientChannel channel; @@ -73,7 +74,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { } private void startSsh(final ChannelHandlerContext ctx, final SocketAddress address) { - logger.debug("Starting SSH to {} on channel: {}", address, ctx.channel()); + LOG.debug("Starting SSH to {} on channel: {}", address, ctx.channel()); final ConnectFuture sshConnectionFuture = sshClient.connect(authenticationHandler.getUsername(), address); sshConnectionFuture.addListener(new SshFutureListener() { @@ -90,7 +91,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { private synchronized void handleSshSessionCreated(final ConnectFuture future, final ChannelHandlerContext ctx) { try { - logger.trace("SSH session created on channel: {}", ctx.channel()); + LOG.trace("SSH session created on channel: {}", ctx.channel()); session = future.getSession(); final AuthFuture authenticateFuture = authenticationHandler.authenticate(session); @@ -111,7 +112,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { private synchronized void handleSshAuthenticated(final ClientSession session, final ChannelHandlerContext ctx) { try { - logger.debug("SSH session authenticated on channel: {}, server version: {}", ctx.channel(), session.getServerVersion()); + LOG.debug("SSH session authenticated on channel: {}, server version: {}", ctx.channel(), session.getServerVersion()); channel = session.createSubsystemChannel(SUBSYSTEM); channel.setStreaming(ClientChannel.Streaming.Async); @@ -133,12 +134,25 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { } private synchronized void handleSshChanelOpened(final ChannelHandlerContext ctx) { - logger.trace("SSH subsystem channel opened successfully on channel: {}", ctx.channel()); + LOG.trace("SSH subsystem channel opened successfully on channel: {}", ctx.channel()); connectPromise.setSuccess(); connectPromise = null; - sshReadAsyncListener = new AsyncSshHanderReader(this, ctx, channel.getAsyncOut()); + // TODO we should also read from error stream and at least log from that + + sshReadAsyncListener = new AsyncSshHandlerReader(new AutoCloseable() { + @Override + public void close() throws Exception { + AsyncSshHandler.this.disconnect(ctx, ctx.newPromise()); + } + }, new AsyncSshHandlerReader.ReadMsgHandler() { + @Override + public void onMessageRead(final ByteBuf msg) { + ctx.fireChannelRead(msg); + } + }, channel.toString(), channel.getAsyncOut()); + // if readAsyncListener receives immediate close, it will close this handler and closing this handler sets channel variable to null if(channel != null) { sshWriteAsyncHandler = new AsyncSshHandlerWriter(channel.getAsyncIn()); @@ -147,7 +161,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { } private synchronized void handleSshSetupFailure(final ChannelHandlerContext ctx, final Throwable e) { - logger.warn("Unable to setup SSH connection on channel: {}", ctx.channel(), e); + LOG.warn("Unable to setup SSH connection on channel: {}", ctx.channel(), e); connectPromise.setFailure(e); connectPromise = null; throw new IllegalStateException("Unable to setup SSH connection on channel: " + ctx.channel(), e); @@ -194,7 +208,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { channel = null; promise.setSuccess(); - logger.debug("SSH session closed on channel: {}", ctx.channel()); + LOG.debug("SSH session closed on channel: {}", ctx.channel()); ctx.fireChannelInactive(); }