// 2. At this level we might be dealing with Chunks of messages(not whole messages) and unexpected behavior might occur
// when we send/queue 1 chunk and fail the other chunks
- private IoOutputStream asyncIn;
+ private volatile IoOutputStream asyncIn;
// Order has to be preserved for queued writes
private final Deque<PendingWriteRequest> pending = new LinkedList<>();
this.asyncIn = asyncIn;
}
- public synchronized void write(final ChannelHandlerContext ctx,
+ public void write(final ChannelHandlerContext ctx,
final Object msg, final ChannelPromise promise) {
- // TODO check for isClosed, isClosing might be performed by mina SSH internally and is not required here
- // If we are closed/closing, set immediate fail
- if (asyncIn == null || asyncIn.isClosed() || asyncIn.isClosing()) {
+ if (asyncIn == null) {
promise.setFailure(new IllegalStateException("Channel closed"));
- } else {
- final ByteBuf byteBufMsg = (ByteBuf) msg;
- if (pending.isEmpty() == false) {
- queueRequest(ctx, byteBufMsg, promise);
- return;
- }
+ return;
+ }
+ // synchronized block due to deadlock that happens on ssh window resize
+ // writes and pending writes would lock the underlyinch channel session
+ // window resize write would try to write the message on an already locked channelSession
+ // while the pending write was in progress from the write callback
+ synchronized (asyncIn) {
+ // TODO check for isClosed, isClosing might be performed by mina SSH internally and is not required here
+ // If we are closed/closing, set immediate fail
+ if (asyncIn.isClosed() || asyncIn.isClosing()) {
+ promise.setFailure(new IllegalStateException("Channel closed"));
+ } else {
+ final ByteBuf byteBufMsg = (ByteBuf) msg;
+ if (pending.isEmpty() == false) {
+ queueRequest(ctx, byteBufMsg, promise);
+ return;
+ }
- writeWithPendingDetection(ctx, promise, byteBufMsg, false);
+ writeWithPendingDetection(ctx, promise, byteBufMsg, false);
+ }
}
}
@Override
public void operationComplete(final IoWriteFuture future) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}",
- ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg));
- }
+ // synchronized block due to deadlock that happens on ssh window resize
+ // writes and pending writes would lock the underlyinch channel session
+ // window resize write would try to write the message on an already locked channelSession,
+ // while the pending write was in progress from the write callback
+ synchronized (asyncIn) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}",
+ ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg));
+ }
- // Notify success or failure
- if (future.isWritten()) {
- promise.setSuccess();
- } else {
- LOG.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(), byteBufToString(byteBufMsg), future.getException());
- promise.setFailure(future.getException());
- }
+ // Notify success or failure
+ if (future.isWritten()) {
+ promise.setSuccess();
+ } else {
+ LOG.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(), byteBufToString(byteBufMsg), future.getException());
+ promise.setFailure(future.getException());
+ }
- // Not needed anymore, release
- byteBufMsg.release();
+ // Not needed anymore, release
+ byteBufMsg.release();
- synchronized (AsyncSshHandlerWriter.this) {
//rescheduling message from queue after successfully sent
if (wasPending) {
byteBufMsg.resetReaderIndex();
// Check pending queue and schedule next
// At this time we are guaranteed that we are not in pending state anymore so the next request should succeed
writePendingIfAny();
-
}
});
}
}
- private synchronized void writePendingIfAny() {
+ private void writePendingIfAny() {
+ synchronized (asyncIn) {
+ if (pending.peek() == null) {
+ return;
+ }
- if (pending.peek() == null) {
- return;
- }
+ final PendingWriteRequest pendingWrite = pending.peek();
+ final ByteBuf msg = pendingWrite.msg;
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Writing pending request on channel: {}, message: {}", pendingWrite.ctx.channel(), byteBufToString(msg));
+ }
- final PendingWriteRequest pendingWrite = pending.peek();
- final ByteBuf msg = pendingWrite.msg;
- if (LOG.isTraceEnabled()) {
- LOG.trace("Writing pending request on channel: {}, message: {}", pendingWrite.ctx.channel(), byteBufToString(msg));
+ writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg, true);
}
-
- writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg, true);
}
public static String byteBufToString(final ByteBuf msg) {
}
@Override
- public synchronized void close() {
+ public void close() {
asyncIn = null;
}