*/
public final class AsyncSshHandlerWriter implements AutoCloseable {
- private static final Logger logger = LoggerFactory
+ private static final Logger LOG = LoggerFactory
.getLogger(AsyncSshHandlerWriter.class);
// public static final int MAX_PENDING_WRITES = 1000;
private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise, final ByteBuf byteBufMsg) {
try {
- if (logger.isTraceEnabled()) {
- logger.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg));
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg));
}
asyncIn.write(toBuffer(byteBufMsg)).addListener(new SshFutureListener<IoWriteFuture>() {
- @Override
- public void operationComplete(final IoWriteFuture future) {
- if (logger.isTraceEnabled()) {
- logger.trace("Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}",
+ @Override
+ public void operationComplete(final IoWriteFuture future) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}",
ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg));
- }
-
- // Notify success or failure
- if (future.isWritten()) {
- promise.setSuccess();
- } else {
- logger.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(), byteBufToString(byteBufMsg), future.getException());
- promise.setFailure(future.getException());
- }
-
- // Not needed anymore, release
- byteBufMsg.release();
-
- // Check pending queue and schedule next
- // At this time we are guaranteed that we are not in pending state anymore so the next request should succeed
- writePendingIfAny();
- }
- });
+ }
+
+ // Notify success or failure
+ if (future.isWritten()) {
+ promise.setSuccess();
+ } else {
+ LOG.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(), byteBufToString(byteBufMsg), future.getException());
+ promise.setFailure(future.getException());
+ }
+
+ // Not needed anymore, release
+ byteBufMsg.release();
+
+ // Check pending queue and schedule next
+ // At this time we are guaranteed that we are not in pending state anymore so the next request should succeed
+ writePendingIfAny();
+ }
+ });
} catch (final WritePendingException e) {
queueRequest(ctx, byteBufMsg, promise);
}
// In case of pending, reschedule next message from queue
final PendingWriteRequest pendingWrite = pending.poll();
final ByteBuf msg = pendingWrite.msg;
- if (logger.isTraceEnabled()) {
- logger.trace("Writing pending request on channel: {}, message: {}", pendingWrite.ctx.channel(), byteBufToString(msg));
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Writing pending request on channel: {}, message: {}", pendingWrite.ctx.channel(), byteBufToString(msg));
}
writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg);
private void queueRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) {
// try {
- logger.debug("Write pending on channel: {}, queueing, current queue size: {}", ctx.channel(), pending.size());
- if (logger.isTraceEnabled()) {
- logger.trace("Queueing request due to pending: {}", byteBufToString(msg));
+ LOG.debug("Write pending on channel: {}, queueing, current queue size: {}", ctx.channel(), pending.size());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Queueing request due to pending: {}", byteBufToString(msg));
}
new PendingWriteRequest(ctx, msg, promise).pend(pending);
// } catch (final Exception ex) {
-// logger.warn("Unable to queue write request on channel: {}. Setting fail for the request: {}", ctx.channel(), ex, byteBufToString(msg));
+// LOG.warn("Unable to queue write request on channel: {}. Setting fail for the request: {}", ctx.channel(), ex, byteBufToString(msg));
// msg.release();
// promise.setFailure(ex);
// }