Fix deadlock in AsyncSshHandlerWriter
[controller.git] / opendaylight / netconf / netconf-netty-util / src / main / java / org / opendaylight / controller / netconf / nettyutil / handler / ssh / client / AsyncSshHandlerWriter.java
index eace0ac7eafe1d09381fbe597f9f6acfece953e6..667a4f6bf8dd23414986ba1de71598861155565e 100644 (file)
@@ -28,9 +28,9 @@ import org.slf4j.LoggerFactory;
  * Async Ssh writer. Takes messages(byte arrays) and sends them encrypted to remote server.
  * Also handles pending writes by caching requests until pending state is over.
  */
-final class AsyncSshHandlerWriter implements AutoCloseable {
+public final class AsyncSshHandlerWriter implements AutoCloseable {
 
-    private static final Logger logger = LoggerFactory
+    private static final Logger LOG = LoggerFactory
             .getLogger(AsyncSshHandlerWriter.class);
 
     // public static final int MAX_PENDING_WRITES = 1000;
@@ -40,7 +40,7 @@ final class AsyncSshHandlerWriter implements AutoCloseable {
     // 2. At this level we might be dealing with Chunks of messages(not whole messages) and unexpected behavior might occur
     // when we send/queue 1 chunk and fail the other chunks
 
-    private IoOutputStream asyncIn;
+    private volatile IoOutputStream asyncIn;
 
     // Order has to be preserved for queued writes
     private final Deque<PendingWriteRequest> pending = new LinkedList<>();
@@ -49,75 +49,104 @@ final class AsyncSshHandlerWriter implements AutoCloseable {
         this.asyncIn = asyncIn;
     }
 
-    public synchronized void write(final ChannelHandlerContext ctx,
+    public void write(final ChannelHandlerContext ctx,
             final Object msg, final ChannelPromise promise) {
-        // TODO check for isClosed, isClosing might be performed by mina SSH internally and is not required here
-        // If we are closed/closing, set immediate fail
-        if (asyncIn == null || asyncIn.isClosed() || asyncIn.isClosing()) {
+        if (asyncIn == null) {
             promise.setFailure(new IllegalStateException("Channel closed"));
-        } else {
-            final ByteBuf byteBufMsg = (ByteBuf) msg;
-            if (pending.isEmpty() == false) {
-                queueRequest(ctx, byteBufMsg, promise);
-                return;
+            return;
+        }
+        // synchronized block due to deadlock that happens on ssh window resize
+        // writes and pending writes would lock the underlyinch channel session
+        // window resize write would try to write the message on an already locked channelSession
+        // while the pending write was in progress from the write callback
+        synchronized (asyncIn) {
+            // TODO check for isClosed, isClosing might be performed by mina SSH internally and is not required here
+            // If we are closed/closing, set immediate fail
+            if (asyncIn.isClosed() || asyncIn.isClosing()) {
+                promise.setFailure(new IllegalStateException("Channel closed"));
+            } else {
+                final ByteBuf byteBufMsg = (ByteBuf) msg;
+                if (pending.isEmpty() == false) {
+                    queueRequest(ctx, byteBufMsg, promise);
+                    return;
+                }
+
+                writeWithPendingDetection(ctx, promise, byteBufMsg, false);
             }
-
-            writeWithPendingDetection(ctx, promise, byteBufMsg);
         }
     }
 
-    private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise, final ByteBuf byteBufMsg) {
+    //sending message with pending
+    //if resending message not succesfull, then attribute wasPending is true
+    private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise, final ByteBuf byteBufMsg, final boolean wasPending) {
         try {
-            if (logger.isTraceEnabled()) {
-                logger.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg));
+
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg));
             }
             asyncIn.write(toBuffer(byteBufMsg)).addListener(new SshFutureListener<IoWriteFuture>() {
 
-                        @Override
-                        public void operationComplete(final IoWriteFuture future) {
-                            if (logger.isTraceEnabled()) {
-                                logger.trace("Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}",
-                                        ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg));
-                            }
-
-                            // Notify success or failure
-                            if (future.isWritten()) {
-                                promise.setSuccess();
-                            } else {
-                                logger.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(), byteBufToString(byteBufMsg), future.getException());
-                                promise.setFailure(future.getException());
-                            }
-
-                            // Not needed anymore, release
-                            byteBufMsg.release();
-
-                            // Check pending queue and schedule next
-                            // At this time we are guaranteed that we are not in pending state anymore so the next request should succeed
-                            writePendingIfAny();
+                @Override
+                public void operationComplete(final IoWriteFuture future) {
+                    // synchronized block due to deadlock that happens on ssh window resize
+                    // writes and pending writes would lock the underlyinch channel session
+                    // window resize write would try to write the message on an already locked channelSession,
+                    // while the pending write was in progress from the write callback
+                    synchronized (asyncIn) {
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace("Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}",
+                                    ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg));
                         }
-                    });
+
+                        // Notify success or failure
+                        if (future.isWritten()) {
+                            promise.setSuccess();
+                        } else {
+                            LOG.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(), byteBufToString(byteBufMsg), future.getException());
+                            promise.setFailure(future.getException());
+                        }
+
+                        // Not needed anymore, release
+                        byteBufMsg.release();
+
+                        //rescheduling message from queue after successfully sent
+                        if (wasPending) {
+                            byteBufMsg.resetReaderIndex();
+                            pending.remove();
+                        }
+                    }
+
+                    // Check pending queue and schedule next
+                    // At this time we are guaranteed that we are not in pending state anymore so the next request should succeed
+                    writePendingIfAny();
+                }
+            });
+
         } catch (final WritePendingException e) {
-            queueRequest(ctx, byteBufMsg, promise);
+
+            if(wasPending == false){
+                queueRequest(ctx, byteBufMsg, promise);
+            }
         }
     }
 
-    private synchronized void writePendingIfAny() {
-        if (pending.peek() == null) {
-            return;
-        }
+    private void writePendingIfAny() {
+        synchronized (asyncIn) {
+            if (pending.peek() == null) {
+                return;
+            }
 
-        // In case of pending, reschedule next message from queue
-        final PendingWriteRequest pendingWrite = pending.poll();
-        final ByteBuf msg = pendingWrite.msg;
-        if (logger.isTraceEnabled()) {
-            logger.trace("Writing pending request on channel: {}, message: {}", pendingWrite.ctx.channel(), byteBufToString(msg));
-        }
+            final PendingWriteRequest pendingWrite = pending.peek();
+            final ByteBuf msg = pendingWrite.msg;
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Writing pending request on channel: {}, message: {}", pendingWrite.ctx.channel(), byteBufToString(msg));
+            }
 
-        writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg);
+            writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg, true);
+        }
     }
 
-    private static String byteBufToString(final ByteBuf msg) {
-        msg.resetReaderIndex();
+    public static String byteBufToString(final ByteBuf msg) {
         final String s = msg.toString(Charsets.UTF_8);
         msg.resetReaderIndex();
         return s;
@@ -125,25 +154,26 @@ final class AsyncSshHandlerWriter implements AutoCloseable {
 
     private void queueRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) {
 //        try {
-        logger.debug("Write pending on channel: {}, queueing, current queue size: {}", ctx.channel(), pending.size());
-        if (logger.isTraceEnabled()) {
-            logger.trace("Queueing request due to pending: {}", byteBufToString(msg));
+        LOG.debug("Write pending on channel: {}, queueing, current queue size: {}", ctx.channel(), pending.size());
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Queueing request due to pending: {}", byteBufToString(msg));
         }
         new PendingWriteRequest(ctx, msg, promise).pend(pending);
 //        } catch (final Exception ex) {
-//            logger.warn("Unable to queue write request on channel: {}. Setting fail for the request: {}", ctx.channel(), ex, byteBufToString(msg));
+//            LOG.warn("Unable to queue write request on channel: {}. Setting fail for the request: {}", ctx.channel(), ex, byteBufToString(msg));
 //            msg.release();
 //            promise.setFailure(ex);
 //        }
     }
 
     @Override
-    public synchronized void close() {
+    public void close() {
         asyncIn = null;
     }
 
     private Buffer toBuffer(final ByteBuf msg) {
         // TODO Buffer vs ByteBuf translate, Can we handle that better ?
+        msg.resetReaderIndex();
         final byte[] temp = new byte[msg.readableBytes()];
         msg.readBytes(temp, 0, msg.readableBytes());
         return new Buffer(temp);