connectPromise.setSuccess();
connectPromise = null;
- sshReadAsyncListener = new SshReadAsyncListener(ctx, channel.getAsyncOut());
- sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn());
-
- ctx.fireChannelActive();
+ sshReadAsyncListener = new SshReadAsyncListener(this, ctx, channel.getAsyncOut());
+ // if readAsyncListener receives immediate close, it will close this handler and closing this handler sets channel variable to null
+ if(channel != null) {
+ sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn());
+ ctx.fireChannelActive();
+ }
}
private synchronized void handleSshSetupFailure(final ChannelHandlerContext ctx, final Throwable e) {
sshWriteAsyncHandler.write(ctx, msg, promise);
}
- private static void handleSshSessionClosed(final ChannelHandlerContext ctx) {
- logger.debug("SSH session closed on channel: {}", ctx.channel());
- ctx.fireChannelInactive();
- }
-
@Override
public synchronized void connect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) throws Exception {
this.connectPromise = promise;
channel = null;
promise.setSuccess();
- handleSshSessionClosed(ctx);
+ logger.debug("SSH session closed on channel: {}", ctx.channel());
+ ctx.fireChannelInactive();
}
/**
private static class SshReadAsyncListener implements SshFutureListener<IoReadFuture>, AutoCloseable {
private static final int BUFFER_SIZE = 8192;
+ private final ChannelOutboundHandler asyncSshHandler;
private final ChannelHandlerContext ctx;
private IoInputStream asyncOut;
private Buffer buf;
private IoReadFuture currentReadFuture;
- public SshReadAsyncListener(final ChannelHandlerContext ctx, final IoInputStream asyncOut) {
+ public SshReadAsyncListener(final ChannelOutboundHandler asyncSshHandler, final ChannelHandlerContext ctx, final IoInputStream asyncOut) {
+ this.asyncSshHandler = asyncSshHandler;
this.ctx = ctx;
this.asyncOut = asyncOut;
buf = new Buffer(BUFFER_SIZE);
@Override
public synchronized void operationComplete(final IoReadFuture future) {
if(future.getException() != null) {
-
if(asyncOut.isClosed() || asyncOut.isClosing()) {
- // We are closing
- handleSshSessionClosed(ctx);
+ // Ssh dropped
+ logger.debug("Ssh session dropped on channel: {}", ctx.channel(), future.getException());
} else {
logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException());
- throw new IllegalStateException("Exception while reading from SSH remote on channel " + ctx.channel(), future.getException());
}
+ invokeDisconnect();
+ return;
}
if (future.getRead() > 0) {
}
}
+ private void invokeDisconnect() {
+ try {
+ asyncSshHandler.disconnect(ctx, ctx.newPromise());
+ } catch (final Exception e) {
+ // This should not happen
+ throw new IllegalStateException(e);
+ }
+ }
+
@Override
public synchronized void close() {
// Remove self as listener on close to prevent reading from closed input
this.asyncIn = asyncIn;
}
+ int c = 0;
+
public synchronized void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
try {
- if(asyncIn.isClosed() || asyncIn.isClosing()) {
- handleSshSessionClosed(ctx);
+ if(asyncIn == null || asyncIn.isClosed() || asyncIn.isClosing()) {
+ // If we are closed/closing, set immediate fail
+ promise.setFailure(new IllegalStateException("Channel closed"));
} else {
lastWriteFuture = asyncIn.write(toBuffer(msg));
lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
// Notify success or failure
if (future.isWritten()) {
promise.setSuccess();
+ } else {
+ promise.setFailure(future.getException());
}
- promise.setFailure(future.getException());
// Reset last pending future
synchronized (SshWriteAsyncHandler.this) {
// Check limit for pending writes
pendingWriteCounter++;
if(pendingWriteCounter > MAX_PENDING_WRITES) {
+ promise.setFailure(e);
handlePendingFailed(ctx, new IllegalStateException("Too much pending writes(" + MAX_PENDING_WRITES + ") on channel: " + ctx.channel() +
", remote window is not getting read or is too small"));
}
+ // We need to reset buffer read index, since we've already read it when we tried to write it the first time
+ ((ByteBuf) msg).resetReaderIndex();
logger.debug("Write pending to SSH remote on channel: {}, current pending count: {}", ctx.channel(), pendingWriteCounter);
// In case of pending, re-invoke write after pending is finished
+ Preconditions.checkNotNull(lastWriteFuture, "Write is pending, but there was no previous write attempt", e);
lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
@Override
public void operationComplete(final IoWriteFuture future) {
- if(future.isWritten()) {
+ // FIXME possible minor race condition, we cannot guarantee that this callback when pending is finished will be executed first
+ // External thread could trigger write on this instance while we are on this line
+ // Verify
+ if (future.isWritten()) {
synchronized (SshWriteAsyncHandler.this) {
// Pending done, decrease counter
pendingWriteCounter--;
+ write(ctx, msg, promise);
}
- write(ctx, msg, promise);
} else {
// Cannot reschedule pending, fail
handlePendingFailed(ctx, e);