X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fnetconf-netty-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Fnettyutil%2Fhandler%2Fssh%2Fclient%2FAsyncSshHandler.java;h=064ae72bc7c512db762a45007a73e9aeebebbe45;hb=edcc020c8fda4b13f22a31d79c13feef0b53b0ee;hp=2761a45d03bedc8730cb69036748c266cdc7f412;hpb=54916d10764a5234881040e9a68ae35d63b3dac9;p=controller.git diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java index 2761a45d03..064ae72bc7 100644 --- a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java @@ -10,7 +10,6 @@ package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client; import com.google.common.base.Preconditions; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; @@ -24,9 +23,6 @@ import org.apache.sshd.client.future.ConnectFuture; import org.apache.sshd.client.future.OpenFuture; import org.apache.sshd.common.future.CloseFuture; import org.apache.sshd.common.future.SshFutureListener; -import org.apache.sshd.common.io.IoInputStream; -import org.apache.sshd.common.io.IoReadFuture; -import org.apache.sshd.common.util.Buffer; import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +32,7 @@ import org.slf4j.LoggerFactory; */ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { - private static final Logger logger = LoggerFactory.getLogger(AsyncSshHandler.class); + private static final Logger LOG = LoggerFactory.getLogger(AsyncSshHandler.class); public static final String SUBSYSTEM = "netconf"; public static final SshClient DEFAULT_CLIENT = SshClient.setUpDefaultClient(); @@ -52,11 +48,14 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { private final AuthenticationHandler authenticationHandler; private final SshClient sshClient; - private SshReadAsyncListener sshReadAsyncListener; + private AsyncSshHandlerReader sshReadAsyncListener; + private AsyncSshHandlerWriter sshWriteAsyncHandler; + private ClientChannel channel; private ClientSession session; private ChannelPromise connectPromise; + public static AsyncSshHandler createForNetconfSubsystem(final AuthenticationHandler authenticationHandler) throws IOException { return new AsyncSshHandler(authenticationHandler, DEFAULT_CLIENT); } @@ -75,7 +74,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { } private void startSsh(final ChannelHandlerContext ctx, final SocketAddress address) { - logger.debug("Starting SSH to {} on channel: {}", address, ctx.channel()); + LOG.debug("Starting SSH to {} on channel: {}", address, ctx.channel()); final ConnectFuture sshConnectionFuture = sshClient.connect(authenticationHandler.getUsername(), address); sshConnectionFuture.addListener(new SshFutureListener() { @@ -92,7 +91,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { private synchronized void handleSshSessionCreated(final ConnectFuture future, final ChannelHandlerContext ctx) { try { - logger.trace("SSH session created on channel: {}", ctx.channel()); + LOG.trace("SSH session created on channel: {}", ctx.channel()); session = future.getSession(); final AuthFuture authenticateFuture = authenticationHandler.authenticate(session); @@ -102,7 +101,11 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { if (future.isSuccess()) { handleSshAuthenticated(session, ctx); } else { - handleSshSetupFailure(ctx, future.getException()); + // Exception does not have to be set in the future, add simple exception in such case + final Throwable exception = future.getException() == null ? + new IllegalStateException("Authentication failed") : + future.getException(); + handleSshSetupFailure(ctx, exception); } } }); @@ -113,7 +116,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { private synchronized void handleSshAuthenticated(final ClientSession session, final ChannelHandlerContext ctx) { try { - logger.debug("SSH session authenticated on channel: {}, server version: {}", ctx.channel(), session.getServerVersion()); + LOG.debug("SSH session authenticated on channel: {}, server version: {}", ctx.channel(), session.getServerVersion()); channel = session.createSubsystemChannel(SUBSYSTEM); channel.setStreaming(ClientChannel.Streaming.Async); @@ -135,54 +138,49 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { } private synchronized void handleSshChanelOpened(final ChannelHandlerContext ctx) { - logger.trace("SSH subsystem channel opened successfully on channel: {}", ctx.channel()); + LOG.trace("SSH subsystem channel opened successfully on channel: {}", ctx.channel()); connectPromise.setSuccess(); - connectPromise = null; - ctx.fireChannelActive(); - final IoInputStream asyncOut = channel.getAsyncOut(); - sshReadAsyncListener = new SshReadAsyncListener(ctx, asyncOut); - } - - private synchronized void handleSshSetupFailure(final ChannelHandlerContext ctx, final Throwable e) { - logger.warn("Unable to setup SSH connection on channel: {}", ctx.channel(), e); - connectPromise.setFailure(e); - connectPromise = null; - throw new IllegalStateException("Unable to setup SSH connection on channel: " + ctx.channel(), e); - } + // TODO we should also read from error stream and at least log from that - @Override - public synchronized void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) { - try { - if(channel.getAsyncIn().isClosed() || channel.getAsyncIn().isClosing()) { - handleSshSessionClosed(ctx); - } else { - channel.getAsyncIn().write(toBuffer(msg)); - ((ByteBuf) msg).release(); + sshReadAsyncListener = new AsyncSshHandlerReader(new AutoCloseable() { + @Override + public void close() throws Exception { + AsyncSshHandler.this.disconnect(ctx, ctx.newPromise()); } - } catch (final Exception e) { - logger.warn("Exception while writing to SSH remote on channel {}", ctx.channel(), e); - throw new IllegalStateException("Exception while writing to SSH remote on channel " + ctx.channel(),e); + }, new AsyncSshHandlerReader.ReadMsgHandler() { + @Override + public void onMessageRead(final ByteBuf msg) { + ctx.fireChannelRead(msg); + } + }, channel.toString(), channel.getAsyncOut()); + + // if readAsyncListener receives immediate close, it will close this handler and closing this handler sets channel variable to null + if(channel != null) { + sshWriteAsyncHandler = new AsyncSshHandlerWriter(channel.getAsyncIn()); + ctx.fireChannelActive(); } } - private static void handleSshSessionClosed(final ChannelHandlerContext ctx) { - logger.debug("SSH session closed on channel: {}", ctx.channel()); - ctx.fireChannelInactive(); + private synchronized void handleSshSetupFailure(final ChannelHandlerContext ctx, final Throwable e) { + LOG.warn("Unable to setup SSH connection on channel: {}", ctx.channel(), e); + disconnect(ctx, ctx.newPromise()); + + // If the promise is not yet done, we have failed with initial connect and set connectPromise to failure + if(!connectPromise.isDone()) { + connectPromise.setFailure(e); + } } - private Buffer toBuffer(final Object msg) { - // TODO Buffer vs ByteBuf translate, Can we handle that better ? - Preconditions.checkState(msg instanceof ByteBuf); - final ByteBuf byteBuf = (ByteBuf) msg; - final byte[] temp = new byte[byteBuf.readableBytes()]; - byteBuf.readBytes(temp, 0, byteBuf.readableBytes()); - return new Buffer(temp); + @Override + public synchronized void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) { + sshWriteAsyncHandler.write(ctx, msg, promise); } @Override public synchronized void connect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) throws Exception { + LOG.debug("SSH session connecting on channel {}. promise: {} ", ctx.channel(), connectPromise); this.connectPromise = promise; startSsh(ctx, remoteAddress); } @@ -193,75 +191,47 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { } @Override - public synchronized void disconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception { - if(sshReadAsyncListener != null) { - sshReadAsyncListener.close(); - } - - session.close(false).addListener(new SshFutureListener() { - @Override - public void operationComplete(final CloseFuture future) { - if(future.isClosed() == false) { - session.close(true); - } - session = null; - } - }); - - channel = null; - } - - /** - * Listener over async input stream from SSH session. - * This listeners schedules reads in a loop until the session is closed or read fails. - */ - private static class SshReadAsyncListener implements SshFutureListener, AutoCloseable { - private static final int BUFFER_SIZE = 8192; + public synchronized void disconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) { + LOG.trace("Closing SSH session on channel: {} with connect promise in state: {}", ctx.channel(), connectPromise); - private final ChannelHandlerContext ctx; - - private IoInputStream asyncOut; - private Buffer buf; - private IoReadFuture currentReadFuture; + // If we have already succeeded and the session was dropped after, we need to fire inactive to notify reconnect logic + if(connectPromise.isSuccess()) { + ctx.fireChannelInactive(); + } - public SshReadAsyncListener(final ChannelHandlerContext ctx, final IoInputStream asyncOut) { - this.ctx = ctx; - this.asyncOut = asyncOut; - buf = new Buffer(BUFFER_SIZE); - asyncOut.read(buf).addListener(this); + if(sshWriteAsyncHandler != null) { + sshWriteAsyncHandler.close(); } - @Override - public synchronized void operationComplete(final IoReadFuture future) { - if(future.getException() != null) { + if(sshReadAsyncListener != null) { + sshReadAsyncListener.close(); + } - if(asyncOut.isClosed() || asyncOut.isClosing()) { - // We are closing - handleSshSessionClosed(ctx); - } else { - logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException()); - throw new IllegalStateException("Exception while reading from SSH remote on channel " + ctx.channel(), future.getException()); + if(session!= null && !session.isClosed() && !session.isClosing()) { + session.close(false).addListener(new SshFutureListener() { + @Override + public void operationComplete(final CloseFuture future) { + if (future.isClosed() == false) { + session.close(true); + } + session = null; } - } - - if (future.getRead() > 0) { - ctx.fireChannelRead(Unpooled.wrappedBuffer(buf.array(), 0, future.getRead())); - - // Schedule next read - buf = new Buffer(BUFFER_SIZE); - currentReadFuture = asyncOut.read(buf); - currentReadFuture.addListener(this); - } + }); } - @Override - public synchronized void close() throws Exception { - // Remove self as listener on close to prevent reading from closed input - if(currentReadFuture != null) { - currentReadFuture.removeListener(this); - } - - asyncOut = null; + // Super disconnect is necessary in this case since we are using NioSocketChannel and it needs to cleanup its resources + // e.g. Socket that it tries to open in its constructor (https://bugs.opendaylight.org/show_bug.cgi?id=2430) + // TODO better solution would be to implement custom ChannelFactory + Channel that will use mina SSH lib internally: port this to custom channel implementation + try { + // Disconnect has to be closed after inactive channel event was fired, because it interferes with it + super.disconnect(ctx, ctx.newPromise()); + } catch (final Exception e) { + LOG.warn("Unable to cleanup all resources for channel: {}. Ignoring.", ctx.channel(), e); } + + channel = null; + promise.setSuccess(); + LOG.debug("SSH session closed on channel: {}", ctx.channel()); } + }