Move PendingWriteRequest.pend()
[netconf.git] / netconf / netconf-netty-util / src / main / java / org / opendaylight / netconf / nettyutil / handler / ssh / client / AsyncSshHandlerWriter.java
index f35372742e9b3aa7f4c54bc6fe6fd05dbab71be9..d13d45a238fcf2f618069ea3ea945bee72e1faef 100644 (file)
@@ -5,10 +5,10 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.netconf.nettyutil.handler.ssh.client;
 
-import com.google.common.base.Preconditions;
+import static com.google.common.base.Preconditions.checkState;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
@@ -16,11 +16,13 @@ import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Deque;
 import java.util.LinkedList;
-import java.util.Queue;
-import org.apache.sshd.common.io.IoOutputStream;
-import org.apache.sshd.common.io.WritePendingException;
-import org.apache.sshd.common.util.buffer.Buffer;
-import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.opendaylight.netconf.shaded.sshd.common.io.IoOutputStream;
+import org.opendaylight.netconf.shaded.sshd.common.io.WritePendingException;
+import org.opendaylight.netconf.shaded.sshd.common.util.buffer.Buffer;
+import org.opendaylight.netconf.shaded.sshd.common.util.buffer.ByteArrayBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -29,9 +31,8 @@ import org.slf4j.LoggerFactory;
  * Also handles pending writes by caching requests until pending state is over.
  */
 public final class AsyncSshHandlerWriter implements AutoCloseable {
-
-    private static final Logger LOG = LoggerFactory
-            .getLogger(AsyncSshHandlerWriter.class);
+    private static final Logger LOG = LoggerFactory.getLogger(AsyncSshHandlerWriter.class);
+    private static final Pattern NON_ASCII = Pattern.compile("([^\\x20-\\x7E\\x0D\\x0A])+");
 
     // public static final int MAX_PENDING_WRITES = 1000;
     // TODO implement Limiting mechanism for pending writes
@@ -46,12 +47,14 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
     // Order has to be preserved for queued writes
     private final Deque<PendingWriteRequest> pending = new LinkedList<>();
 
+    @GuardedBy("asyncInLock")
+    private boolean isWriteExecuted = false;
+
     public AsyncSshHandlerWriter(final IoOutputStream asyncIn) {
         this.asyncIn = asyncIn;
     }
 
-    public void write(final ChannelHandlerContext ctx,
-            final Object msg, final ChannelPromise promise) {
+    public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
         if (asyncIn == null) {
             promise.setFailure(new IllegalStateException("Channel closed"));
             return;
@@ -67,7 +70,7 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
                 promise.setFailure(new IllegalStateException("Channel closed"));
             } else {
                 final ByteBuf byteBufMsg = (ByteBuf) msg;
-                if (!pending.isEmpty()) {
+                if (isWriteExecuted) {
                     queueRequest(ctx, byteBufMsg, promise);
                     return;
                 }
@@ -86,7 +89,10 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg));
             }
-            asyncIn.writePacket(toBuffer(byteBufMsg)).addListener(future -> {
+
+            isWriteExecuted = true;
+
+            asyncIn.writeBuffer(toBuffer(byteBufMsg)).addListener(future -> {
                 // synchronized block due to deadlock that happens on ssh window resize
                 // writes and pending writes would lock the underlyinch channel session
                 // window resize write would try to write the message on an already locked channelSession,
@@ -132,11 +138,12 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
 
     private void writePendingIfAny() {
         synchronized (asyncInLock) {
-            if (pending.peek() == null) {
+            final PendingWriteRequest pendingWrite = pending.peek();
+            if (pendingWrite == null) {
+                isWriteExecuted = false;
                 return;
             }
 
-            final PendingWriteRequest pendingWrite = pending.peek();
             final ByteBuf msg = pendingWrite.msg;
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Writing pending request on channel: {}, message: {}",
@@ -148,18 +155,33 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
     }
 
     public static String byteBufToString(final ByteBuf msg) {
-        final String s = msg.toString(StandardCharsets.UTF_8);
+        final String message = msg.toString(StandardCharsets.UTF_8);
         msg.resetReaderIndex();
-        return s;
+        Matcher matcher = NON_ASCII.matcher(message);
+        return matcher.replaceAll(data -> {
+            StringBuilder buf = new StringBuilder();
+            buf.append("\"");
+            for (byte b : data.group().getBytes(StandardCharsets.US_ASCII)) {
+                buf.append(String.format("%02X", b));
+            }
+            buf.append("\"");
+            return buf.toString();
+        });
     }
 
     private void queueRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) {
-//        try {
         LOG.debug("Write pending on channel: {}, queueing, current queue size: {}", ctx.channel(), pending.size());
         if (LOG.isTraceEnabled()) {
             LOG.trace("Queueing request due to pending: {}", byteBufToString(msg));
         }
-        new PendingWriteRequest(ctx, msg, promise).pend(pending);
+
+//      try {
+        final var req = new PendingWriteRequest(ctx, msg, promise);
+        // Preconditions.checkState(pending.size() < MAX_PENDING_WRITES,
+        // "Too much pending writes(%s) on channel: %s, remote window is not getting read or is too small",
+        // pending.size(), ctx.channel());
+        checkState(pending.offer(req), "Cannot pend another request write (pending count: %s) on channel: %s",
+                pending.size(), ctx.channel());
 //        } catch (final Exception ex) {
 //            LOG.warn("Unable to queue write request on channel: {}. Setting fail for the request: {}",
 //                    ctx.channel(), ex, byteBufToString(msg));
@@ -193,13 +215,5 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
             this.msg = msg;
             this.promise = promise;
         }
-
-        public void pend(final Queue<PendingWriteRequest> pending) {
-            // Preconditions.checkState(pending.size() < MAX_PENDING_WRITES,
-            // "Too much pending writes(%s) on channel: %s, remote window is not getting read or is too small",
-            // pending.size(), ctx.channel());
-            Preconditions.checkState(pending.offer(this),
-                "Cannot pend another request write (pending count: %s) on channel: %s", pending.size(), ctx.channel());
-        }
     }
 }