Fix racecondition in AsyncSshHandlerWriter
[controller.git] / opendaylight / netconf / netconf-netty-util / src / main / java / org / opendaylight / controller / netconf / nettyutil / handler / ssh / client / AsyncSshHandlerWriter.java
index 1e976ce6a1394c31e0bbd8dd8746463193a3ac5c..10941021bceaa1e12433ff5291e7d55741ad822a 100644 (file)
@@ -62,12 +62,15 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
                 return;
             }
 
-            writeWithPendingDetection(ctx, promise, byteBufMsg);
+            writeWithPendingDetection(ctx, promise, byteBufMsg, false);
         }
     }
 
-    private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise, final ByteBuf byteBufMsg) {
+    //sending message with pending
+    //if resending message not succesfull, then attribute wasPending is true
+    private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise, final ByteBuf byteBufMsg, final boolean wasPending) {
         try {
+
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg));
             }
@@ -77,7 +80,7 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
                 public void operationComplete(final IoWriteFuture future) {
                     if (LOG.isTraceEnabled()) {
                         LOG.trace("Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}",
-                                        ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg));
+                                ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg));
                     }
 
                     // Notify success or failure
@@ -91,33 +94,45 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
                     // Not needed anymore, release
                     byteBufMsg.release();
 
+                    synchronized (AsyncSshHandlerWriter.this) {
+                        //rescheduling message from queue after successfully sent
+                        if (wasPending) {
+                            byteBufMsg.resetReaderIndex();
+                            pending.remove();
+                        }
+                    }
+
                     // Check pending queue and schedule next
                     // At this time we are guaranteed that we are not in pending state anymore so the next request should succeed
                     writePendingIfAny();
+
                 }
             });
+
         } catch (final WritePendingException e) {
-            queueRequest(ctx, byteBufMsg, promise);
+
+            if(wasPending == false){
+                queueRequest(ctx, byteBufMsg, promise);
+            }
         }
     }
 
     private synchronized void writePendingIfAny() {
+
         if (pending.peek() == null) {
             return;
         }
 
-        // In case of pending, reschedule next message from queue
-        final PendingWriteRequest pendingWrite = pending.poll();
+        final PendingWriteRequest pendingWrite = pending.peek();
         final ByteBuf msg = pendingWrite.msg;
         if (LOG.isTraceEnabled()) {
             LOG.trace("Writing pending request on channel: {}, message: {}", pendingWrite.ctx.channel(), byteBufToString(msg));
         }
 
-        writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg);
+        writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg, true);
     }
 
     public static String byteBufToString(final ByteBuf msg) {
-        msg.resetReaderIndex();
         final String s = msg.toString(Charsets.UTF_8);
         msg.resetReaderIndex();
         return s;
@@ -144,6 +159,7 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
 
     private Buffer toBuffer(final ByteBuf msg) {
         // TODO Buffer vs ByteBuf translate, Can we handle that better ?
+        msg.resetReaderIndex();
         final byte[] temp = new byte[msg.readableBytes()];
         msg.readBytes(temp, 0, msg.readableBytes());
         return new Buffer(temp);