return;
}
- writeWithPendingDetection(ctx, promise, byteBufMsg);
+ writeWithPendingDetection(ctx, promise, byteBufMsg, false);
}
}
- private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise, final ByteBuf byteBufMsg) {
+ //sending message with pending
+ //if resending message not succesfull, then attribute wasPending is true
+ private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise, final ByteBuf byteBufMsg, boolean wasPending) {
try {
+
if (LOG.isTraceEnabled()) {
LOG.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg));
}
public void operationComplete(final IoWriteFuture future) {
if (LOG.isTraceEnabled()) {
LOG.trace("Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}",
- ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg));
+ ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg));
}
// Notify success or failure
// Check pending queue and schedule next
// At this time we are guaranteed that we are not in pending state anymore so the next request should succeed
writePendingIfAny();
+
}
});
+
+ //rescheduling message from queue after successfully sent
+ if(wasPending){
+ byteBufMsg.resetReaderIndex();
+ pending.remove();
+ }
+
} catch (final WritePendingException e) {
- queueRequest(ctx, byteBufMsg, promise);
+
+ if(wasPending == false){
+ queueRequest(ctx, byteBufMsg, promise);
+ }
}
}
private synchronized void writePendingIfAny() {
+
if (pending.peek() == null) {
return;
}
- // In case of pending, reschedule next message from queue
- final PendingWriteRequest pendingWrite = pending.poll();
+ final PendingWriteRequest pendingWrite = pending.peek();
final ByteBuf msg = pendingWrite.msg;
if (LOG.isTraceEnabled()) {
LOG.trace("Writing pending request on channel: {}, message: {}", pendingWrite.ctx.channel(), byteBufToString(msg));
}
- writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg);
+ writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg, true);
}
public static String byteBufToString(final ByteBuf msg) {
- msg.resetReaderIndex();
final String s = msg.toString(Charsets.UTF_8);
msg.resetReaderIndex();
return s;
private Buffer toBuffer(final ByteBuf msg) {
// TODO Buffer vs ByteBuf translate, Can we handle that better ?
+ msg.resetReaderIndex();
final byte[] temp = new byte[msg.readableBytes()];
msg.readBytes(temp, 0, msg.readableBytes());
return new Buffer(temp);