Bug-2342: Fixing rpc-reply messages id's get mixed up 03/13403/11
authorMarian Dubai <mdubai@cisco.com>
Thu, 4 Dec 2014 09:03:41 +0000 (10:03 +0100)
committerMarian Dubai <mdubai@cisco.com>
Mon, 15 Dec 2014 13:32:44 +0000 (13:32 +0000)
Change-Id: If59b1dd30c6552efa93df7bb8d776396a9c7bafb
Signed-off-by: Marian Dubai <mdubai@cisco.com>
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java

index 1e976ce6a1394c31e0bbd8dd8746463193a3ac5c..757dc1a2ba334d4169718e751a553d309e6240f6 100644 (file)
@@ -62,12 +62,15 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
                 return;
             }
 
-            writeWithPendingDetection(ctx, promise, byteBufMsg);
+            writeWithPendingDetection(ctx, promise, byteBufMsg, false);
         }
     }
 
-    private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise, final ByteBuf byteBufMsg) {
+    //sending message with pending
+    //if resending message not succesfull, then attribute wasPending is true
+    private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise, final ByteBuf byteBufMsg, boolean wasPending) {
         try {
+
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg));
             }
@@ -77,7 +80,7 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
                 public void operationComplete(final IoWriteFuture future) {
                     if (LOG.isTraceEnabled()) {
                         LOG.trace("Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}",
-                                        ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg));
+                            ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg));
                     }
 
                     // Notify success or failure
@@ -94,30 +97,40 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
                     // Check pending queue and schedule next
                     // At this time we are guaranteed that we are not in pending state anymore so the next request should succeed
                     writePendingIfAny();
+
                 }
             });
+
+            //rescheduling message from queue after successfully sent
+            if(wasPending){
+                byteBufMsg.resetReaderIndex();
+                pending.remove();
+            }
+
         } catch (final WritePendingException e) {
-            queueRequest(ctx, byteBufMsg, promise);
+
+            if(wasPending == false){
+                queueRequest(ctx, byteBufMsg, promise);
+            }
         }
     }
 
     private synchronized void writePendingIfAny() {
+
         if (pending.peek() == null) {
             return;
         }
 
-        // In case of pending, reschedule next message from queue
-        final PendingWriteRequest pendingWrite = pending.poll();
+        final PendingWriteRequest pendingWrite = pending.peek();
         final ByteBuf msg = pendingWrite.msg;
         if (LOG.isTraceEnabled()) {
             LOG.trace("Writing pending request on channel: {}, message: {}", pendingWrite.ctx.channel(), byteBufToString(msg));
         }
 
-        writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg);
+        writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg, true);
     }
 
     public static String byteBufToString(final ByteBuf msg) {
-        msg.resetReaderIndex();
         final String s = msg.toString(Charsets.UTF_8);
         msg.resetReaderIndex();
         return s;
@@ -144,6 +157,7 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
 
     private Buffer toBuffer(final ByteBuf msg) {
         // TODO Buffer vs ByteBuf translate, Can we handle that better ?
+        msg.resetReaderIndex();
         final byte[] temp = new byte[msg.readableBytes()];
         msg.readBytes(temp, 0, msg.readableBytes());
         return new Buffer(temp);