Fixed deadlock in AsyncSshHandlerWriter
[netconf.git] / netconf / netconf-netty-util / src / main / java / org / opendaylight / netconf / nettyutil / handler / ssh / client / AsyncSshHandlerWriter.java
index 35f5d972c053ea07a861cf71640c9866f45f0177..e05408551f065bdb151e0306e6c51f3100734377 100644 (file)
@@ -12,15 +12,16 @@ import com.google.common.base.Preconditions;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Deque;
 import java.util.LinkedList;
 import java.util.Queue;
-import org.apache.sshd.common.future.SshFutureListener;
 import org.apache.sshd.common.io.IoOutputStream;
-import org.apache.sshd.common.io.IoWriteFuture;
 import org.apache.sshd.common.io.WritePendingException;
-import org.apache.sshd.common.util.Buffer;
+import org.apache.sshd.common.util.buffer.Buffer;
+import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,6 +51,9 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
         this.asyncIn = asyncIn;
     }
 
+    @GuardedBy("asyncInLock")
+    private boolean isWriteExecuted = false;
+
     public void write(final ChannelHandlerContext ctx,
             final Object msg, final ChannelPromise promise) {
         if (asyncIn == null) {
@@ -67,7 +71,7 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
                 promise.setFailure(new IllegalStateException("Channel closed"));
             } else {
                 final ByteBuf byteBufMsg = (ByteBuf) msg;
-                if (!pending.isEmpty()) {
+                if (isWriteExecuted) {
                     queueRequest(ctx, byteBufMsg, promise);
                     return;
                 }
@@ -86,49 +90,47 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg));
             }
-            asyncIn.write(toBuffer(byteBufMsg)).addListener(new SshFutureListener<IoWriteFuture>() {
-
-                @Override
-                public void operationComplete(final IoWriteFuture future) {
-                    // synchronized block due to deadlock that happens on ssh window resize
-                    // writes and pending writes would lock the underlyinch channel session
-                    // window resize write would try to write the message on an already locked channelSession,
-                    // while the pending write was in progress from the write callback
-                    synchronized (asyncInLock) {
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace(
-                                "Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}",
-                                ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg));
-                        }
-
-                        // Notify success or failure
-                        if (future.isWritten()) {
-                            promise.setSuccess();
-                        } else {
-                            LOG.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(),
-                                    byteBufToString(byteBufMsg), future.getException());
-                            promise.setFailure(future.getException());
-                        }
-
-                        // Not needed anymore, release
-                        byteBufMsg.release();
-
-                        //rescheduling message from queue after successfully sent
-                        if (wasPending) {
-                            byteBufMsg.resetReaderIndex();
-                            pending.remove();
-                        }
+
+            isWriteExecuted = true;
+
+            asyncIn.writePacket(toBuffer(byteBufMsg)).addListener(future -> {
+                // synchronized block due to deadlock that happens on ssh window resize
+                // writes and pending writes would lock the underlyinch channel session
+                // window resize write would try to write the message on an already locked channelSession,
+                // while the pending write was in progress from the write callback
+                synchronized (asyncInLock) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace(
+                            "Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}",
+                            ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg));
+                    }
+
+                    // Notify success or failure
+                    if (future.isWritten()) {
+                        promise.setSuccess();
+                    } else {
+                        LOG.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(),
+                                byteBufToString(byteBufMsg), future.getException());
+                        promise.setFailure(future.getException());
                     }
 
-                    // Check pending queue and schedule next
-                    // At this time we are guaranteed that we are not in pending state anymore
-                    // so the next request should succeed
-                    writePendingIfAny();
+                    //rescheduling message from queue after successfully sent
+                    if (wasPending) {
+                        byteBufMsg.resetReaderIndex();
+                        pending.remove();
+                    }
+
+                    // Not needed anymore, release
+                    byteBufMsg.release();
                 }
-            });
 
-        } catch (final WritePendingException e) {
+                // Check pending queue and schedule next
+                // At this time we are guaranteed that we are not in pending state anymore
+                // so the next request should succeed
+                writePendingIfAny();
+            });
 
+        } catch (final IOException | WritePendingException e) {
             if (!wasPending) {
                 queueRequest(ctx, byteBufMsg, promise);
             }
@@ -138,6 +140,7 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
     private void writePendingIfAny() {
         synchronized (asyncInLock) {
             if (pending.peek() == null) {
+                isWriteExecuted = false;
                 return;
             }
 
@@ -183,7 +186,7 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
         msg.resetReaderIndex();
         final byte[] temp = new byte[msg.readableBytes()];
         msg.readBytes(temp, 0, msg.readableBytes());
-        return new Buffer(temp);
+        return new ByteArrayBuffer(temp);
     }
 
     private static final class PendingWriteRequest {