Merge changes from topic 'mri-neon-sr1' into stable/neon
[netconf.git] / netconf / netconf-netty-util / src / main / java / org / opendaylight / netconf / nettyutil / handler / ssh / client / AsyncSshHandlerWriter.java
index 96e54942dfac06cc9d9a779ea8a1f4e0ad5e83c2..e05408551f065bdb151e0306e6c51f3100734377 100644 (file)
@@ -12,6 +12,7 @@ import com.google.common.base.Preconditions;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Deque;
 import java.util.LinkedList;
@@ -20,6 +21,7 @@ import org.apache.sshd.common.io.IoOutputStream;
 import org.apache.sshd.common.io.WritePendingException;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,6 +51,9 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
         this.asyncIn = asyncIn;
     }
 
+    @GuardedBy("asyncInLock")
+    private boolean isWriteExecuted = false;
+
     public void write(final ChannelHandlerContext ctx,
             final Object msg, final ChannelPromise promise) {
         if (asyncIn == null) {
@@ -66,7 +71,7 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
                 promise.setFailure(new IllegalStateException("Channel closed"));
             } else {
                 final ByteBuf byteBufMsg = (ByteBuf) msg;
-                if (!pending.isEmpty()) {
+                if (isWriteExecuted) {
                     queueRequest(ctx, byteBufMsg, promise);
                     return;
                 }
@@ -85,7 +90,10 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg));
             }
-            asyncIn.write(toBuffer(byteBufMsg)).addListener(future -> {
+
+            isWriteExecuted = true;
+
+            asyncIn.writePacket(toBuffer(byteBufMsg)).addListener(future -> {
                 // synchronized block due to deadlock that happens on ssh window resize
                 // writes and pending writes would lock the underlyinch channel session
                 // window resize write would try to write the message on an already locked channelSession,
@@ -122,8 +130,7 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
                 writePendingIfAny();
             });
 
-        } catch (final WritePendingException e) {
-
+        } catch (final IOException | WritePendingException e) {
             if (!wasPending) {
                 queueRequest(ctx, byteBufMsg, promise);
             }
@@ -133,6 +140,7 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
     private void writePendingIfAny() {
         synchronized (asyncInLock) {
             if (pending.peek() == null) {
+                isWriteExecuted = false;
                 return;
             }