- /**
- * Listener over async input stream from SSH session.
- * This listeners schedules reads in a loop until the session is closed or read fails.
- */
- private static class SshReadAsyncListener implements SshFutureListener<IoReadFuture>, AutoCloseable {
- private static final int BUFFER_SIZE = 8192;
-
- private final ChannelOutboundHandler asyncSshHandler;
- private final ChannelHandlerContext ctx;
-
- private IoInputStream asyncOut;
- private Buffer buf;
- private IoReadFuture currentReadFuture;
-
- public SshReadAsyncListener(final ChannelOutboundHandler asyncSshHandler, final ChannelHandlerContext ctx, final IoInputStream asyncOut) {
- this.asyncSshHandler = asyncSshHandler;
- this.ctx = ctx;
- this.asyncOut = asyncOut;
- buf = new Buffer(BUFFER_SIZE);
- asyncOut.read(buf).addListener(this);
- }
-
- @Override
- public synchronized void operationComplete(final IoReadFuture future) {
- if(future.getException() != null) {
-
- if(asyncOut.isClosed() || asyncOut.isClosing()) {
-
- // Ssh dropped
- logger.debug("Ssh session dropped on channel: {}", ctx.channel(), future.getException());
- invokeDisconnect();
- return;
- } else {
- logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException());
- invokeDisconnect();
- }
- }
-
- if (future.getRead() > 0) {
- ctx.fireChannelRead(Unpooled.wrappedBuffer(buf.array(), 0, future.getRead()));
-
- // Schedule next read
- buf = new Buffer(BUFFER_SIZE);
- currentReadFuture = asyncOut.read(buf);
- currentReadFuture.addListener(this);
- }
- }
-
- private void invokeDisconnect() {
- try {
- asyncSshHandler.disconnect(ctx, ctx.newPromise());
- } catch (final Exception e) {
- // This should not happen
- throw new IllegalStateException(e);
- }
- }
-
- @Override
- public synchronized void close() {
- // Remove self as listener on close to prevent reading from closed input
- if(currentReadFuture != null) {
- currentReadFuture.removeListener(this);
- }
-
- asyncOut = null;
- }
- }
-
- private static final class SshWriteAsyncHandler implements AutoCloseable {
- public static final int MAX_PENDING_WRITES = 100;
-
- private final ChannelOutboundHandler channelHandler;
- private IoOutputStream asyncIn;
-
- // Counter that holds the amount of pending write messages
- // Pending write can occur in case remote window is full
- // In such case, we need to wait for the pending write to finish
- private int pendingWriteCounter;
- // Last write future, that can be pending
- private IoWriteFuture lastWriteFuture;
-
- public SshWriteAsyncHandler(final ChannelOutboundHandler channelHandler, final IoOutputStream asyncIn) {
- this.channelHandler = channelHandler;
- this.asyncIn = asyncIn;
- }
-
- int c = 0;
-
- public synchronized void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
- try {
- if(asyncIn == null || asyncIn.isClosed() || asyncIn.isClosing()) {
- // If we are closed/closing, set immediate fail
- promise.setFailure(new IllegalStateException("Channel closed"));
- } else {
- lastWriteFuture = asyncIn.write(toBuffer(msg));
- lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
-
- @Override
- public void operationComplete(final IoWriteFuture future) {
- ((ByteBuf) msg).release();
-
- // Notify success or failure
- if (future.isWritten()) {
- promise.setSuccess();
- } else {
- promise.setFailure(future.getException());
- }
-
- // Reset last pending future
- synchronized (SshWriteAsyncHandler.this) {
- lastWriteFuture = null;
- }
- }
- });
- }
- } catch (final WritePendingException e) {
- // Check limit for pending writes
- pendingWriteCounter++;
- if(pendingWriteCounter > MAX_PENDING_WRITES) {
- handlePendingFailed(ctx, new IllegalStateException("Too much pending writes(" + MAX_PENDING_WRITES + ") on channel: " + ctx.channel() +
- ", remote window is not getting read or is too small"));
- }
-
- logger.debug("Write pending to SSH remote on channel: {}, current pending count: {}", ctx.channel(), pendingWriteCounter);
-
- // In case of pending, re-invoke write after pending is finished
- lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
- @Override
- public void operationComplete(final IoWriteFuture future) {
- if (future.isWritten()) {
- synchronized (SshWriteAsyncHandler.this) {
- // Pending done, decrease counter
- pendingWriteCounter--;
- }
- write(ctx, msg, promise);
- } else {
- // Cannot reschedule pending, fail
- handlePendingFailed(ctx, e);
- }
- }
-
- });
- }
- }
-
- private void handlePendingFailed(final ChannelHandlerContext ctx, final Exception e) {
- logger.warn("Exception while writing to SSH remote on channel {}", ctx.channel(), e);
- try {
- channelHandler.disconnect(ctx, ctx.newPromise());
- } catch (final Exception ex) {
- // This should not happen
- throw new IllegalStateException(ex);
- }
- }
-
- @Override
- public void close() {
- asyncIn = null;
- }
-
- private Buffer toBuffer(final Object msg) {
- // TODO Buffer vs ByteBuf translate, Can we handle that better ?
- Preconditions.checkState(msg instanceof ByteBuf);
- final ByteBuf byteBuf = (ByteBuf) msg;
- final byte[] temp = new byte[byteBuf.readableBytes()];
- byteBuf.readBytes(temp, 0, byteBuf.readableBytes());
- return new Buffer(temp);
- }
-
- }