From 047e6f42c08a0d47a8a62fa7f94eddb88b4d16f2 Mon Sep 17 00:00:00 2001 From: Marian Dubai Date: Thu, 4 Dec 2014 10:03:41 +0100 Subject: [PATCH] Bug-2342: Fixing rpc-reply messages id's get mixed up Change-Id: If59b1dd30c6552efa93df7bb8d776396a9c7bafb Signed-off-by: Marian Dubai --- .../ssh/client/AsyncSshHandlerWriter.java | 30 ++++++++++++++----- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java index 1e976ce6a1..757dc1a2ba 100644 --- a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java @@ -62,12 +62,15 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { return; } - writeWithPendingDetection(ctx, promise, byteBufMsg); + writeWithPendingDetection(ctx, promise, byteBufMsg, false); } } - private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise, final ByteBuf byteBufMsg) { + //sending message with pending + //if resending message not succesfull, then attribute wasPending is true + private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise, final ByteBuf byteBufMsg, boolean wasPending) { try { + if (LOG.isTraceEnabled()) { LOG.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg)); } @@ -77,7 +80,7 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { public void operationComplete(final IoWriteFuture future) { if (LOG.isTraceEnabled()) { LOG.trace("Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}", - ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg)); + ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg)); } // Notify success or failure @@ -94,30 +97,40 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { // Check pending queue and schedule next // At this time we are guaranteed that we are not in pending state anymore so the next request should succeed writePendingIfAny(); + } }); + + //rescheduling message from queue after successfully sent + if(wasPending){ + byteBufMsg.resetReaderIndex(); + pending.remove(); + } + } catch (final WritePendingException e) { - queueRequest(ctx, byteBufMsg, promise); + + if(wasPending == false){ + queueRequest(ctx, byteBufMsg, promise); + } } } private synchronized void writePendingIfAny() { + if (pending.peek() == null) { return; } - // In case of pending, reschedule next message from queue - final PendingWriteRequest pendingWrite = pending.poll(); + final PendingWriteRequest pendingWrite = pending.peek(); final ByteBuf msg = pendingWrite.msg; if (LOG.isTraceEnabled()) { LOG.trace("Writing pending request on channel: {}, message: {}", pendingWrite.ctx.channel(), byteBufToString(msg)); } - writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg); + writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg, true); } public static String byteBufToString(final ByteBuf msg) { - msg.resetReaderIndex(); final String s = msg.toString(Charsets.UTF_8); msg.resetReaderIndex(); return s; @@ -144,6 +157,7 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { private Buffer toBuffer(final ByteBuf msg) { // TODO Buffer vs ByteBuf translate, Can we handle that better ? + msg.resetReaderIndex(); final byte[] temp = new byte[msg.readableBytes()]; msg.readBytes(temp, 0, msg.readableBytes()); return new Buffer(temp); -- 2.36.6