Fix deadlock in AsyncSshHandlerWriter
[controller.git] / opendaylight / netconf / netconf-netty-util / src / main / java / org / opendaylight / controller / netconf / nettyutil / handler / ssh / client / AsyncSshHandlerWriter.java
index 757dc1a2ba334d4169718e751a553d309e6240f6..667a4f6bf8dd23414986ba1de71598861155565e 100644 (file)
@@ -40,7 +40,7 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
     // 2. At this level we might be dealing with Chunks of messages(not whole messages) and unexpected behavior might occur
     // when we send/queue 1 chunk and fail the other chunks
 
-    private IoOutputStream asyncIn;
+    private volatile IoOutputStream asyncIn;
 
     // Order has to be preserved for queued writes
     private final Deque<PendingWriteRequest> pending = new LinkedList<>();
@@ -49,26 +49,36 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
         this.asyncIn = asyncIn;
     }
 
-    public synchronized void write(final ChannelHandlerContext ctx,
+    public void write(final ChannelHandlerContext ctx,
             final Object msg, final ChannelPromise promise) {
-        // TODO check for isClosed, isClosing might be performed by mina SSH internally and is not required here
-        // If we are closed/closing, set immediate fail
-        if (asyncIn == null || asyncIn.isClosed() || asyncIn.isClosing()) {
+        if (asyncIn == null) {
             promise.setFailure(new IllegalStateException("Channel closed"));
-        } else {
-            final ByteBuf byteBufMsg = (ByteBuf) msg;
-            if (pending.isEmpty() == false) {
-                queueRequest(ctx, byteBufMsg, promise);
-                return;
-            }
+            return;
+        }
+        // synchronized block due to deadlock that happens on ssh window resize
+        // writes and pending writes would lock the underlyinch channel session
+        // window resize write would try to write the message on an already locked channelSession
+        // while the pending write was in progress from the write callback
+        synchronized (asyncIn) {
+            // TODO check for isClosed, isClosing might be performed by mina SSH internally and is not required here
+            // If we are closed/closing, set immediate fail
+            if (asyncIn.isClosed() || asyncIn.isClosing()) {
+                promise.setFailure(new IllegalStateException("Channel closed"));
+            } else {
+                final ByteBuf byteBufMsg = (ByteBuf) msg;
+                if (pending.isEmpty() == false) {
+                    queueRequest(ctx, byteBufMsg, promise);
+                    return;
+                }
 
-            writeWithPendingDetection(ctx, promise, byteBufMsg, false);
+                writeWithPendingDetection(ctx, promise, byteBufMsg, false);
+            }
         }
     }
 
     //sending message with pending
     //if resending message not succesfull, then attribute wasPending is true
-    private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise, final ByteBuf byteBufMsg, boolean wasPending) {
+    private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise, final ByteBuf byteBufMsg, final boolean wasPending) {
         try {
 
             if (LOG.isTraceEnabled()) {
@@ -78,35 +88,40 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
 
                 @Override
                 public void operationComplete(final IoWriteFuture future) {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}",
-                            ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg));
-                    }
-
-                    // Notify success or failure
-                    if (future.isWritten()) {
-                        promise.setSuccess();
-                    } else {
-                        LOG.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(), byteBufToString(byteBufMsg), future.getException());
-                        promise.setFailure(future.getException());
+                    // synchronized block due to deadlock that happens on ssh window resize
+                    // writes and pending writes would lock the underlyinch channel session
+                    // window resize write would try to write the message on an already locked channelSession,
+                    // while the pending write was in progress from the write callback
+                    synchronized (asyncIn) {
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace("Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}",
+                                    ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg));
+                        }
+
+                        // Notify success or failure
+                        if (future.isWritten()) {
+                            promise.setSuccess();
+                        } else {
+                            LOG.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(), byteBufToString(byteBufMsg), future.getException());
+                            promise.setFailure(future.getException());
+                        }
+
+                        // Not needed anymore, release
+                        byteBufMsg.release();
+
+                        //rescheduling message from queue after successfully sent
+                        if (wasPending) {
+                            byteBufMsg.resetReaderIndex();
+                            pending.remove();
+                        }
                     }
 
-                    // Not needed anymore, release
-                    byteBufMsg.release();
-
                     // Check pending queue and schedule next
                     // At this time we are guaranteed that we are not in pending state anymore so the next request should succeed
                     writePendingIfAny();
-
                 }
             });
 
-            //rescheduling message from queue after successfully sent
-            if(wasPending){
-                byteBufMsg.resetReaderIndex();
-                pending.remove();
-            }
-
         } catch (final WritePendingException e) {
 
             if(wasPending == false){
@@ -115,19 +130,20 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
         }
     }
 
-    private synchronized void writePendingIfAny() {
+    private void writePendingIfAny() {
+        synchronized (asyncIn) {
+            if (pending.peek() == null) {
+                return;
+            }
 
-        if (pending.peek() == null) {
-            return;
-        }
+            final PendingWriteRequest pendingWrite = pending.peek();
+            final ByteBuf msg = pendingWrite.msg;
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Writing pending request on channel: {}, message: {}", pendingWrite.ctx.channel(), byteBufToString(msg));
+            }
 
-        final PendingWriteRequest pendingWrite = pending.peek();
-        final ByteBuf msg = pendingWrite.msg;
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("Writing pending request on channel: {}, message: {}", pendingWrite.ctx.channel(), byteBufToString(msg));
+            writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg, true);
         }
-
-        writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg, true);
     }
 
     public static String byteBufToString(final ByteBuf msg) {
@@ -151,7 +167,7 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
     }
 
     @Override
-    public synchronized void close() {
+    public void close() {
         asyncIn = null;
     }