Fixed deadlock in AsyncSshHandlerWriter
[netconf.git] / netconf / netconf-netty-util / src / main / java / org / opendaylight / netconf / nettyutil / handler / ssh / client / AsyncSshHandlerWriter.java
index 7753c9fd43a39b8f06247a4356b6f9d73eaef4f7..e05408551f065bdb151e0306e6c51f3100734377 100644 (file)
@@ -12,16 +12,16 @@ import com.google.common.base.Preconditions;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Deque;
 import java.util.LinkedList;
 import java.util.Queue;
-import org.apache.sshd.common.future.SshFutureListener;
 import org.apache.sshd.common.io.IoOutputStream;
-import org.apache.sshd.common.io.IoWriteFuture;
 import org.apache.sshd.common.io.WritePendingException;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,9 +38,10 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
     // TODO implement Limiting mechanism for pending writes
     // But there is a possible issue with limiting:
     // 1. What to do when queue is full ? Immediate Fail for every request ?
-    // 2. At this level we might be dealing with Chunks of messages(not whole messages) and unexpected behavior might occur
-    // when we send/queue 1 chunk and fail the other chunks
+    // 2. At this level we might be dealing with Chunks of messages(not whole messages)
+    // and unexpected behavior might occur when we send/queue 1 chunk and fail the other chunks
 
+    private final Object asyncInLock = new Object();
     private volatile IoOutputStream asyncIn;
 
     // Order has to be preserved for queued writes
@@ -50,6 +51,9 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
         this.asyncIn = asyncIn;
     }
 
+    @GuardedBy("asyncInLock")
+    private boolean isWriteExecuted = false;
+
     public void write(final ChannelHandlerContext ctx,
             final Object msg, final ChannelPromise promise) {
         if (asyncIn == null) {
@@ -60,14 +64,14 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
         // writes and pending writes would lock the underlyinch channel session
         // window resize write would try to write the message on an already locked channelSession
         // while the pending write was in progress from the write callback
-        synchronized (asyncIn) {
+        synchronized (asyncInLock) {
             // TODO check for isClosed, isClosing might be performed by mina SSH internally and is not required here
             // If we are closed/closing, set immediate fail
             if (asyncIn.isClosed() || asyncIn.isClosing()) {
                 promise.setFailure(new IllegalStateException("Channel closed"));
             } else {
                 final ByteBuf byteBufMsg = (ByteBuf) msg;
-                if (pending.isEmpty() == false) {
+                if (isWriteExecuted) {
                     queueRequest(ctx, byteBufMsg, promise);
                     return;
                 }
@@ -79,68 +83,72 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
 
     //sending message with pending
     //if resending message not succesfull, then attribute wasPending is true
-    private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise, final ByteBuf byteBufMsg, final boolean wasPending) {
+    private void writeWithPendingDetection(final ChannelHandlerContext ctx, final ChannelPromise promise,
+                                           final ByteBuf byteBufMsg, final boolean wasPending) {
         try {
 
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg));
             }
-            asyncIn.write(toBuffer(byteBufMsg)).addListener(new SshFutureListener<IoWriteFuture>() {
-
-                @Override
-                public void operationComplete(final IoWriteFuture future) {
-                    // synchronized block due to deadlock that happens on ssh window resize
-                    // writes and pending writes would lock the underlyinch channel session
-                    // window resize write would try to write the message on an already locked channelSession,
-                    // while the pending write was in progress from the write callback
-                    synchronized (asyncIn) {
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}",
-                                    ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg));
-                        }
-
-                        // Notify success or failure
-                        if (future.isWritten()) {
-                            promise.setSuccess();
-                        } else {
-                            LOG.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(), byteBufToString(byteBufMsg), future.getException());
-                            promise.setFailure(future.getException());
-                        }
-
-                        // Not needed anymore, release
-                        byteBufMsg.release();
-
-                        //rescheduling message from queue after successfully sent
-                        if (wasPending) {
-                            byteBufMsg.resetReaderIndex();
-                            pending.remove();
-                        }
+
+            isWriteExecuted = true;
+
+            asyncIn.writePacket(toBuffer(byteBufMsg)).addListener(future -> {
+                // synchronized block due to deadlock that happens on ssh window resize
+                // writes and pending writes would lock the underlyinch channel session
+                // window resize write would try to write the message on an already locked channelSession,
+                // while the pending write was in progress from the write callback
+                synchronized (asyncInLock) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace(
+                            "Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}",
+                            ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg));
                     }
 
-                    // Check pending queue and schedule next
-                    // At this time we are guaranteed that we are not in pending state anymore so the next request should succeed
-                    writePendingIfAny();
+                    // Notify success or failure
+                    if (future.isWritten()) {
+                        promise.setSuccess();
+                    } else {
+                        LOG.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(),
+                                byteBufToString(byteBufMsg), future.getException());
+                        promise.setFailure(future.getException());
+                    }
+
+                    //rescheduling message from queue after successfully sent
+                    if (wasPending) {
+                        byteBufMsg.resetReaderIndex();
+                        pending.remove();
+                    }
+
+                    // Not needed anymore, release
+                    byteBufMsg.release();
                 }
-            });
 
-        } catch (final WritePendingException e) {
+                // Check pending queue and schedule next
+                // At this time we are guaranteed that we are not in pending state anymore
+                // so the next request should succeed
+                writePendingIfAny();
+            });
 
-            if(wasPending == false){
+        } catch (final IOException | WritePendingException e) {
+            if (!wasPending) {
                 queueRequest(ctx, byteBufMsg, promise);
             }
         }
     }
 
     private void writePendingIfAny() {
-        synchronized (asyncIn) {
+        synchronized (asyncInLock) {
             if (pending.peek() == null) {
+                isWriteExecuted = false;
                 return;
             }
 
             final PendingWriteRequest pendingWrite = pending.peek();
             final ByteBuf msg = pendingWrite.msg;
             if (LOG.isTraceEnabled()) {
-                LOG.trace("Writing pending request on channel: {}, message: {}", pendingWrite.ctx.channel(), byteBufToString(msg));
+                LOG.trace("Writing pending request on channel: {}, message: {}",
+                        pendingWrite.ctx.channel(), byteBufToString(msg));
             }
 
             writeWithPendingDetection(pendingWrite.ctx, pendingWrite.promise, msg, true);
@@ -161,7 +169,8 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
         }
         new PendingWriteRequest(ctx, msg, promise).pend(pending);
 //        } catch (final Exception ex) {
-//            LOG.warn("Unable to queue write request on channel: {}. Setting fail for the request: {}", ctx.channel(), ex, byteBufToString(msg));
+//            LOG.warn("Unable to queue write request on channel: {}. Setting fail for the request: {}",
+//                    ctx.channel(), ex, byteBufToString(msg));
 //            msg.release();
 //            promise.setFailure(ex);
 //        }
@@ -173,10 +182,7 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
     }
 
     private static Buffer toBuffer(final ByteBuf msg) {
-        // FIXME: Translation from ByteBuf to Buffer. Buffer is an abstract class, so based on the assumptions
-        //        we can make about the contents of ByteBuf, we should be able to skip copying byte arrays around
-        //        by creating an appropriate subclass.
-
+        // TODO Buffer vs ByteBuf translate, Can we handle that better ?
         msg.resetReaderIndex();
         final byte[] temp = new byte[msg.readableBytes()];
         msg.readBytes(temp, 0, msg.readableBytes());
@@ -188,7 +194,7 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
         private final ByteBuf msg;
         private final ChannelPromise promise;
 
-        public PendingWriteRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) {
+        PendingWriteRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) {
             this.ctx = ctx;
             // Reset reader index, last write (failed) attempt moved index to the end
             msg.resetReaderIndex();
@@ -200,8 +206,8 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
             // Preconditions.checkState(pending.size() < MAX_PENDING_WRITES,
             // "Too much pending writes(%s) on channel: %s, remote window is not getting read or is too small",
             // pending.size(), ctx.channel());
-            Preconditions.checkState(pending.offer(this), "Cannot pend another request write (pending count: %s) on channel: %s",
-                    pending.size(), ctx.channel());
+            Preconditions.checkState(pending.offer(this),
+                "Cannot pend another request write (pending count: %s) on channel: %s", pending.size(), ctx.channel());
         }
     }
 }