connectPromise = null;
sshReadAsyncListener = new SshReadAsyncListener(this, ctx, channel.getAsyncOut());
- sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn());
-
- ctx.fireChannelActive();
+ // if readAsyncListener receives immediate close, it will close this handler and closing this handler sets channel variable to null
+ if(channel != null) {
+ sshWriteAsyncHandler = new SshWriteAsyncHandler(this, channel.getAsyncIn());
+ ctx.fireChannelActive();
+ }
}
private synchronized void handleSshSetupFailure(final ChannelHandlerContext ctx, final Throwable e) {
@Override
public synchronized void operationComplete(final IoReadFuture future) {
if(future.getException() != null) {
-
if(asyncOut.isClosed() || asyncOut.isClosing()) {
-
// Ssh dropped
logger.debug("Ssh session dropped on channel: {}", ctx.channel(), future.getException());
- invokeDisconnect();
- return;
} else {
logger.warn("Exception while reading from SSH remote on channel {}", ctx.channel(), future.getException());
- invokeDisconnect();
}
+ invokeDisconnect();
+ return;
}
if (future.getRead() > 0) {
// Check limit for pending writes
pendingWriteCounter++;
if(pendingWriteCounter > MAX_PENDING_WRITES) {
+ promise.setFailure(e);
handlePendingFailed(ctx, new IllegalStateException("Too much pending writes(" + MAX_PENDING_WRITES + ") on channel: " + ctx.channel() +
", remote window is not getting read or is too small"));
}
+ // We need to reset buffer read index, since we've already read it when we tried to write it the first time
+ ((ByteBuf) msg).resetReaderIndex();
logger.debug("Write pending to SSH remote on channel: {}, current pending count: {}", ctx.channel(), pendingWriteCounter);
// In case of pending, re-invoke write after pending is finished
+ Preconditions.checkNotNull(lastWriteFuture, "Write is pending, but there was no previous write attempt", e);
lastWriteFuture.addListener(new SshFutureListener<IoWriteFuture>() {
@Override
public void operationComplete(final IoWriteFuture future) {
+ // FIXME possible minor race condition, we cannot guarantee that this callback when pending is finished will be executed first
+ // External thread could trigger write on this instance while we are on this line
+ // Verify
if (future.isWritten()) {
synchronized (SshWriteAsyncHandler.this) {
// Pending done, decrease counter
pendingWriteCounter--;
+ write(ctx, msg, promise);
}
- write(ctx, msg, promise);
} else {
// Cannot reschedule pending, fail
handlePendingFailed(ctx, e);