X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fnetconf-netty-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Fnettyutil%2Fhandler%2Fssh%2Fclient%2FAsyncSshHandler.java;h=14d753f1f8b59c25a7ea871e3bfe5e430c1d9970;hp=3d1e4784f2ded5677472e3a25d99adaa11e24cf9;hb=a2563a94253f9c2603e0ab25b8f412ea07fcf51d;hpb=b5b204bafd8ee18692fc023cb2eae6e123369340 diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java index 3d1e4784f2..14d753f1f8 100644 --- a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandler.java @@ -10,9 +10,7 @@ package org.opendaylight.controller.netconf.nettyutil.handler.ssh.client; import com.google.common.base.Preconditions; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelOutboundHandler; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; import java.io.IOException; @@ -25,12 +23,6 @@ import org.apache.sshd.client.future.ConnectFuture; import org.apache.sshd.client.future.OpenFuture; import org.apache.sshd.common.future.CloseFuture; import org.apache.sshd.common.future.SshFutureListener; -import org.apache.sshd.common.io.IoInputStream; -import org.apache.sshd.common.io.IoOutputStream; -import org.apache.sshd.common.io.IoReadFuture; -import org.apache.sshd.common.io.IoWriteFuture; -import org.apache.sshd.common.io.WritePendingException; -import org.apache.sshd.common.util.Buffer; import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.AuthenticationHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +32,7 @@ import org.slf4j.LoggerFactory; */ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { - private static final Logger logger = LoggerFactory.getLogger(AsyncSshHandler.class); + private static final Logger LOG = LoggerFactory.getLogger(AsyncSshHandler.class); public static final String SUBSYSTEM = "netconf"; public static final SshClient DEFAULT_CLIENT = SshClient.setUpDefaultClient(); @@ -56,8 +48,8 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { private final AuthenticationHandler authenticationHandler; private final SshClient sshClient; - private SshReadAsyncListener sshReadAsyncListener; - private SshWriteAsyncHandler sshWriteAsyncHandler; + private AsyncSshHandlerReader sshReadAsyncListener; + private AsyncSshHandlerWriter sshWriteAsyncHandler; private ClientChannel channel; private ClientSession session; @@ -82,7 +74,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { } private void startSsh(final ChannelHandlerContext ctx, final SocketAddress address) { - logger.debug("Starting SSH to {} on channel: {}", address, ctx.channel()); + LOG.debug("Starting SSH to {} on channel: {}", address, ctx.channel()); final ConnectFuture sshConnectionFuture = sshClient.connect(authenticationHandler.getUsername(), address); sshConnectionFuture.addListener(new SshFutureListener() { @@ -99,7 +91,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { private synchronized void handleSshSessionCreated(final ConnectFuture future, final ChannelHandlerContext ctx) { try { - logger.trace("SSH session created on channel: {}", ctx.channel()); + LOG.trace("SSH session created on channel: {}", ctx.channel()); session = future.getSession(); final AuthFuture authenticateFuture = authenticationHandler.authenticate(session); @@ -120,7 +112,7 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { private synchronized void handleSshAuthenticated(final ClientSession session, final ChannelHandlerContext ctx) { try { - logger.debug("SSH session authenticated on channel: {}, server version: {}", ctx.channel(), session.getServerVersion()); + LOG.debug("SSH session authenticated on channel: {}, server version: {}", ctx.channel(), session.getServerVersion()); channel = session.createSubsystemChannel(SUBSYSTEM); channel.setStreaming(ClientChannel.Streaming.Async); @@ -142,21 +134,34 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { } private synchronized void handleSshChanelOpened(final ChannelHandlerContext ctx) { - logger.trace("SSH subsystem channel opened successfully on channel: {}", ctx.channel()); + LOG.trace("SSH subsystem channel opened successfully on channel: {}", ctx.channel()); connectPromise.setSuccess(); connectPromise = null; - sshReadAsyncListener = new SshReadAsyncListener(this, ctx, channel.getAsyncOut()); + // TODO we should also read from error stream and at least log from that + + sshReadAsyncListener = new AsyncSshHandlerReader(new AutoCloseable() { + @Override + public void close() throws Exception { + AsyncSshHandler.this.disconnect(ctx, ctx.newPromise()); + } + }, new AsyncSshHandlerReader.ReadMsgHandler() { + @Override + public void onMessageRead(final ByteBuf msg) { + ctx.fireChannelRead(msg); + } + }, channel.toString(), channel.getAsyncOut()); + // if readAsyncListener receives immediate close, it will close this handler and closing this handler sets channel variable to null if(channel != null) { - sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn()); + sshWriteAsyncHandler = new AsyncSshHandlerWriter(channel.getAsyncIn()); ctx.fireChannelActive(); } } private synchronized void handleSshSetupFailure(final ChannelHandlerContext ctx, final Throwable e) { - logger.warn("Unable to setup SSH connection on channel: {}", ctx.channel(), e); + LOG.warn("Unable to setup SSH connection on channel: {}", ctx.channel(), e); connectPromise.setFailure(e); connectPromise = null; throw new IllegalStateException("Unable to setup SSH connection on channel: " + ctx.channel(), e); @@ -203,182 +208,8 @@ public class AsyncSshHandler extends ChannelOutboundHandlerAdapter { channel = null; promise.setSuccess(); - logger.debug("SSH session closed on channel: {}", ctx.channel()); + LOG.debug("SSH session closed on channel: {}", ctx.channel()); ctx.fireChannelInactive(); } - /** - * Listener over async input stream from SSH session. - * This listeners schedules reads in a loop until the session is closed or read fails. - */ - private static class SshReadAsyncListener implements SshFutureListener, AutoCloseable { - private static final int BUFFER_SIZE = 8192; - - private final ChannelOutboundHandler asyncSshHandler; - private final ChannelHandlerContext ctx; - - private IoInputStream asyncOut; - private Buffer buf; - private IoReadFuture currentReadFuture; - - public SshReadAsyncListener(final ChannelOutboundHandler asyncSshHandler, final ChannelHandlerContext ctx, final IoInputStream asyncOut) { - this.asyncSshHandler = asyncSshHandler; - this.ctx = ctx; - this.asyncOut = asyncOut; - buf = new Buffer(BUFFER_SIZE); - asyncOut.read(buf).addListener(this); - } - - @Override - public synchronized void operationComplete(final IoReadFuture future) { - if(future.getException() != null) { - if(asyncOut.isClosed() || asyncOut.isClosing()) { - // Ssh dropped - logger.debug("Ssh session dropped on channel: {}", ctx.channel(), future.getException()); - } else { - logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException()); - } - invokeDisconnect(); - return; - } - - if (future.getRead() > 0) { - ctx.fireChannelRead(Unpooled.wrappedBuffer(buf.array(), 0, future.getRead())); - - // Schedule next read - buf = new Buffer(BUFFER_SIZE); - currentReadFuture = asyncOut.read(buf); - currentReadFuture.addListener(this); - } - } - - private void invokeDisconnect() { - try { - asyncSshHandler.disconnect(ctx, ctx.newPromise()); - } catch (final Exception e) { - // This should not happen - throw new IllegalStateException(e); - } - } - - @Override - public synchronized void close() { - // Remove self as listener on close to prevent reading from closed input - if(currentReadFuture != null) { - currentReadFuture.removeListener(this); - } - - asyncOut = null; - } - } - - private static final class SshWriteAsyncHandler implements AutoCloseable { - public static final int MAX_PENDING_WRITES = 100; - - private final ChannelOutboundHandler channelHandler; - private IoOutputStream asyncIn; - - // Counter that holds the amount of pending write messages - // Pending write can occur in case remote window is full - // In such case, we need to wait for the pending write to finish - private int pendingWriteCounter; - // Last write future, that can be pending - private IoWriteFuture lastWriteFuture; - - public SshWriteAsyncHandler(final ChannelOutboundHandler channelHandler, final IoOutputStream asyncIn) { - this.channelHandler = channelHandler; - this.asyncIn = asyncIn; - } - - int c = 0; - - public synchronized void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) { - try { - if(asyncIn == null || asyncIn.isClosed() || asyncIn.isClosing()) { - // If we are closed/closing, set immediate fail - promise.setFailure(new IllegalStateException("Channel closed")); - } else { - lastWriteFuture = asyncIn.write(toBuffer(msg)); - lastWriteFuture.addListener(new SshFutureListener() { - - @Override - public void operationComplete(final IoWriteFuture future) { - ((ByteBuf) msg).release(); - - // Notify success or failure - if (future.isWritten()) { - promise.setSuccess(); - } else { - promise.setFailure(future.getException()); - } - - // Reset last pending future - synchronized (SshWriteAsyncHandler.this) { - lastWriteFuture = null; - } - } - }); - } - } catch (final WritePendingException e) { - // Check limit for pending writes - pendingWriteCounter++; - if(pendingWriteCounter > MAX_PENDING_WRITES) { - promise.setFailure(e); - handlePendingFailed(ctx, new IllegalStateException("Too much pending writes(" + MAX_PENDING_WRITES + ") on channel: " + ctx.channel() + - ", remote window is not getting read or is too small")); - } - - // We need to reset buffer read index, since we've already read it when we tried to write it the first time - ((ByteBuf) msg).resetReaderIndex(); - logger.debug("Write pending to SSH remote on channel: {}, current pending count: {}", ctx.channel(), pendingWriteCounter); - - // In case of pending, re-invoke write after pending is finished - Preconditions.checkNotNull(lastWriteFuture, "Write is pending, but there was no previous write attempt", e); - lastWriteFuture.addListener(new SshFutureListener() { - @Override - public void operationComplete(final IoWriteFuture future) { - // FIXME possible minor race condition, we cannot guarantee that this callback when pending is finished will be executed first - // External thread could trigger write on this instance while we are on this line - // Verify - if (future.isWritten()) { - synchronized (SshWriteAsyncHandler.this) { - // Pending done, decrease counter - pendingWriteCounter--; - write(ctx, msg, promise); - } - } else { - // Cannot reschedule pending, fail - handlePendingFailed(ctx, e); - } - } - - }); - } - } - - private void handlePendingFailed(final ChannelHandlerContext ctx, final Exception e) { - logger.warn("Exception while writing to SSH remote on channel {}", ctx.channel(), e); - try { - channelHandler.disconnect(ctx, ctx.newPromise()); - } catch (final Exception ex) { - // This should not happen - throw new IllegalStateException(ex); - } - } - - @Override - public void close() { - asyncIn = null; - } - - private Buffer toBuffer(final Object msg) { - // TODO Buffer vs ByteBuf translate, Can we handle that better ? - Preconditions.checkState(msg instanceof ByteBuf); - final ByteBuf byteBuf = (ByteBuf) msg; - final byte[] temp = new byte[byteBuf.readableBytes()]; - byteBuf.readBytes(temp, 0, byteBuf.readableBytes()); - return new Buffer(temp); - } - - } }