Fix racecondition in AsyncSshHandlerWriter
[controller.git] / opendaylight / netconf / netconf-netty-util / src / main / java / org / opendaylight / controller / netconf / nettyutil / handler / ssh / client / AsyncSshHandlerWriter.java
index eace0ac7eafe1d09381fbe597f9f6acfece953e6..10941021bceaa1e12433ff5291e7d55741ad822a 100644 (file)
@@ -28,9 +28,9 @@ import org.slf4j.LoggerFactory;
  * Async Ssh writer. Takes messages(byte arrays) and sends them encrypted to remote server.
  * Also handles pending writes by caching requests until pending state is over.
  */
-final class AsyncSshHandlerWriter implements AutoCloseable {
+public final class AsyncSshHandlerWriter implements AutoCloseable {
 
-    private static final Logger logger = LoggerFactory
+    private static final Logger LOG = LoggerFactory
             .getLogger(AsyncSshHandlerWriter.class);
 
     // public static final int MAX_PENDING_WRITES = 1000;
@@ -62,62 +62,77 @@ final class AsyncSshHandlerWriter implements AutoCloseable {
                 return;
             }
 
-            writeWithPendingDetection(ctx, promise, byteBufMsg);
+            writeWithPendingDetection(ctx, promise, byteBufMsg, false);
         }
     }
 
-    private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise, final ByteBuf byteBufMsg) {
+    //sending message with pending
+    //if resending message not succesfull, then attribute wasPending is true
+    private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise, final ByteBuf byteBufMsg, final boolean wasPending) {
         try {
-            if (logger.isTraceEnabled()) {
-                logger.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg));
+
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg));
             }
             asyncIn.write(toBuffer(byteBufMsg)).addListener(new SshFutureListener<IoWriteFuture>() {
 
-                        @Override
-                        public void operationComplete(final IoWriteFuture future) {
-                            if (logger.isTraceEnabled()) {
-                                logger.trace("Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}",
-                                        ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg));
-                            }
-
-                            // Notify success or failure
-                            if (future.isWritten()) {
-                                promise.setSuccess();
-                            } else {
-                                logger.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(), byteBufToString(byteBufMsg), future.getException());
-                                promise.setFailure(future.getException());
-                            }
-
-                            // Not needed anymore, release
-                            byteBufMsg.release();
-
-                            // Check pending queue and schedule next
-                            // At this time we are guaranteed that we are not in pending state anymore so the next request should succeed
-                            writePendingIfAny();
+                @Override
+                public void operationComplete(final IoWriteFuture future) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}",
+                                ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg));
+                    }
+
+                    // Notify success or failure
+                    if (future.isWritten()) {
+                        promise.setSuccess();
+                    } else {
+                        LOG.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(), byteBufToString(byteBufMsg), future.getException());
+                        promise.setFailure(future.getException());
+                    }
+
+                    // Not needed anymore, release
+                    byteBufMsg.release();
+
+                    synchronized (AsyncSshHandlerWriter.this) {
+                        //rescheduling message from queue after successfully sent
+                        if (wasPending) {
+                            byteBufMsg.resetReaderIndex();
+                            pending.remove();
                         }
-                    });
+                    }
+
+                    // Check pending queue and schedule next
+                    // At this time we are guaranteed that we are not in pending state anymore so the next request should succeed
+                    writePendingIfAny();
+
+                }
+            });
+
         } catch (final WritePendingException e) {
-            queueRequest(ctx, byteBufMsg, promise);
+
+            if(wasPending == false){
+                queueRequest(ctx, byteBufMsg, promise);
+            }
         }
     }
 
     private synchronized void writePendingIfAny() {
+
         if (pending.peek() == null) {
             return;
         }
 
-        // In case of pending, reschedule next message from queue
-        final PendingWriteRequest pendingWrite = pending.poll();
+        final PendingWriteRequest pendingWrite = pending.peek();
         final ByteBuf msg = pendingWrite.msg;
-        if (logger.isTraceEnabled()) {
-            logger.trace("Writing pending request on channel: {}, message: {}", pendingWrite.ctx.channel(), byteBufToString(msg));
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Writing pending request on channel: {}, message: {}", pendingWrite.ctx.channel(), byteBufToString(msg));
         }
 
-        writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg);
+        writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg, true);
     }
 
-    private static String byteBufToString(final ByteBuf msg) {
-        msg.resetReaderIndex();
+    public static String byteBufToString(final ByteBuf msg) {
         final String s = msg.toString(Charsets.UTF_8);
         msg.resetReaderIndex();
         return s;
@@ -125,13 +140,13 @@ final class AsyncSshHandlerWriter implements AutoCloseable {
 
     private void queueRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) {
 //        try {
-        logger.debug("Write pending on channel: {}, queueing, current queue size: {}", ctx.channel(), pending.size());
-        if (logger.isTraceEnabled()) {
-            logger.trace("Queueing request due to pending: {}", byteBufToString(msg));
+        LOG.debug("Write pending on channel: {}, queueing, current queue size: {}", ctx.channel(), pending.size());
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Queueing request due to pending: {}", byteBufToString(msg));
         }
         new PendingWriteRequest(ctx, msg, promise).pend(pending);
 //        } catch (final Exception ex) {
-//            logger.warn("Unable to queue write request on channel: {}. Setting fail for the request: {}", ctx.channel(), ex, byteBufToString(msg));
+//            LOG.warn("Unable to queue write request on channel: {}. Setting fail for the request: {}", ctx.channel(), ex, byteBufToString(msg));
 //            msg.release();
 //            promise.setFailure(ex);
 //        }
@@ -144,6 +159,7 @@ final class AsyncSshHandlerWriter implements AutoCloseable {
 
     private Buffer toBuffer(final ByteBuf msg) {
         // TODO Buffer vs ByteBuf translate, Can we handle that better ?
+        msg.resetReaderIndex();
         final byte[] temp = new byte[msg.readableBytes()];
         msg.readBytes(temp, 0, msg.readableBytes());
         return new Buffer(temp);