import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
+import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Deque;
import java.util.LinkedList;
import java.util.Queue;
-import org.apache.sshd.common.future.SshFutureListener;
import org.apache.sshd.common.io.IoOutputStream;
-import org.apache.sshd.common.io.IoWriteFuture;
import org.apache.sshd.common.io.WritePendingException;
import org.apache.sshd.common.util.buffer.Buffer;
import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.checkerframework.checker.lock.qual.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// TODO implement Limiting mechanism for pending writes
// But there is a possible issue with limiting:
// 1. What to do when queue is full ? Immediate Fail for every request ?
- // 2. At this level we might be dealing with Chunks of messages(not whole messages) and unexpected behavior might occur
- // when we send/queue 1 chunk and fail the other chunks
+ // 2. At this level we might be dealing with Chunks of messages(not whole messages)
+ // and unexpected behavior might occur when we send/queue 1 chunk and fail the other chunks
+ private final Object asyncInLock = new Object();
private volatile IoOutputStream asyncIn;
// Order has to be preserved for queued writes
this.asyncIn = asyncIn;
}
+ @GuardedBy("asyncInLock")
+ private boolean isWriteExecuted = false;
+
public void write(final ChannelHandlerContext ctx,
final Object msg, final ChannelPromise promise) {
if (asyncIn == null) {
// writes and pending writes would lock the underlyinch channel session
// window resize write would try to write the message on an already locked channelSession
// while the pending write was in progress from the write callback
- synchronized (asyncIn) {
+ synchronized (asyncInLock) {
// TODO check for isClosed, isClosing might be performed by mina SSH internally and is not required here
// If we are closed/closing, set immediate fail
if (asyncIn.isClosed() || asyncIn.isClosing()) {
promise.setFailure(new IllegalStateException("Channel closed"));
} else {
final ByteBuf byteBufMsg = (ByteBuf) msg;
- if (pending.isEmpty() == false) {
+ if (isWriteExecuted) {
queueRequest(ctx, byteBufMsg, promise);
return;
}
//sending message with pending
//if resending message not succesfull, then attribute wasPending is true
- private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise, final ByteBuf byteBufMsg, final boolean wasPending) {
+ private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise,
+ final ByteBuf byteBufMsg, final boolean wasPending) {
try {
if (LOG.isTraceEnabled()) {
LOG.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg));
}
- asyncIn.write(toBuffer(byteBufMsg)).addListener(new SshFutureListener<IoWriteFuture>() {
-
- @Override
- public void operationComplete(final IoWriteFuture future) {
- // synchronized block due to deadlock that happens on ssh window resize
- // writes and pending writes would lock the underlyinch channel session
- // window resize write would try to write the message on an already locked channelSession,
- // while the pending write was in progress from the write callback
- synchronized (asyncIn) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}",
- ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg));
- }
-
- // Notify success or failure
- if (future.isWritten()) {
- promise.setSuccess();
- } else {
- LOG.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(), byteBufToString(byteBufMsg), future.getException());
- promise.setFailure(future.getException());
- }
-
- // Not needed anymore, release
- byteBufMsg.release();
-
- //rescheduling message from queue after successfully sent
- if (wasPending) {
- byteBufMsg.resetReaderIndex();
- pending.remove();
- }
+
+ isWriteExecuted = true;
+
+ asyncIn.writePacket(toBuffer(byteBufMsg)).addListener(future -> {
+ // synchronized block due to deadlock that happens on ssh window resize
+ // writes and pending writes would lock the underlyinch channel session
+ // window resize write would try to write the message on an already locked channelSession,
+ // while the pending write was in progress from the write callback
+ synchronized (asyncInLock) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(
+ "Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}",
+ ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg));
}
- // Check pending queue and schedule next
- // At this time we are guaranteed that we are not in pending state anymore so the next request should succeed
- writePendingIfAny();
+ // Notify success or failure
+ if (future.isWritten()) {
+ promise.setSuccess();
+ } else {
+ LOG.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(),
+ byteBufToString(byteBufMsg), future.getException());
+ promise.setFailure(future.getException());
+ }
+
+ //rescheduling message from queue after successfully sent
+ if (wasPending) {
+ byteBufMsg.resetReaderIndex();
+ pending.remove();
+ }
+
+ // Not needed anymore, release
+ byteBufMsg.release();
}
- });
- } catch (final WritePendingException e) {
+ // Check pending queue and schedule next
+ // At this time we are guaranteed that we are not in pending state anymore
+ // so the next request should succeed
+ writePendingIfAny();
+ });
- if(wasPending == false){
+ } catch (final IOException | WritePendingException e) {
+ if (!wasPending) {
queueRequest(ctx, byteBufMsg, promise);
}
}
}
private void writePendingIfAny() {
- synchronized (asyncIn) {
+ synchronized (asyncInLock) {
if (pending.peek() == null) {
+ isWriteExecuted = false;
return;
}
final PendingWriteRequest pendingWrite = pending.peek();
final ByteBuf msg = pendingWrite.msg;
if (LOG.isTraceEnabled()) {
- LOG.trace("Writing pending request on channel: {}, message: {}", pendingWrite.ctx.channel(), byteBufToString(msg));
+ LOG.trace("Writing pending request on channel: {}, message: {}",
+ pendingWrite.ctx.channel(), byteBufToString(msg));
}
writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg, true);
}
new PendingWriteRequest(ctx, msg, promise).pend(pending);
// } catch (final Exception ex) {
-// LOG.warn("Unable to queue write request on channel: {}. Setting fail for the request: {}", ctx.channel(), ex, byteBufToString(msg));
+// LOG.warn("Unable to queue write request on channel: {}. Setting fail for the request: {}",
+// ctx.channel(), ex, byteBufToString(msg));
// msg.release();
// promise.setFailure(ex);
// }
}
private static Buffer toBuffer(final ByteBuf msg) {
- // FIXME: Translation from ByteBuf to Buffer. Buffer is an abstract class, so based on the assumptions
- // we can make about the contents of ByteBuf, we should be able to skip copying byte arrays around
- // by creating an appropriate subclass.
-
+ // TODO Buffer vs ByteBuf translate, Can we handle that better ?
msg.resetReaderIndex();
final byte[] temp = new byte[msg.readableBytes()];
msg.readBytes(temp, 0, msg.readableBytes());
private final ByteBuf msg;
private final ChannelPromise promise;
- public PendingWriteRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) {
+ PendingWriteRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) {
this.ctx = ctx;
// Reset reader index, last write (failed) attempt moved index to the end
msg.resetReaderIndex();
// Preconditions.checkState(pending.size() < MAX_PENDING_WRITES,
// "Too much pending writes(%s) on channel: %s, remote window is not getting read or is too small",
// pending.size(), ctx.channel());
- Preconditions.checkState(pending.offer(this), "Cannot pend another request write (pending count: %s) on channel: %s",
- pending.size(), ctx.channel());
+ Preconditions.checkState(pending.offer(this),
+ "Cannot pend another request write (pending count: %s) on channel: %s", pending.size(), ctx.channel());
}
}
}