Move PendingWriteRequest.pend()
[netconf.git] / netconf / netconf-netty-util / src / main / java / org / opendaylight / netconf / nettyutil / handler / ssh / client / AsyncSshHandlerWriter.java
index 35f5d972c053ea07a861cf71640c9866f45f0177..d13d45a238fcf2f618069ea3ea945bee72e1faef 100644 (file)
@@ -5,22 +5,24 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.netconf.nettyutil.handler.ssh.client;
 
-import com.google.common.base.Preconditions;
+import static com.google.common.base.Preconditions.checkState;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Deque;
 import java.util.LinkedList;
-import java.util.Queue;
-import org.apache.sshd.common.future.SshFutureListener;
-import org.apache.sshd.common.io.IoOutputStream;
-import org.apache.sshd.common.io.IoWriteFuture;
-import org.apache.sshd.common.io.WritePendingException;
-import org.apache.sshd.common.util.Buffer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.opendaylight.netconf.shaded.sshd.common.io.IoOutputStream;
+import org.opendaylight.netconf.shaded.sshd.common.io.WritePendingException;
+import org.opendaylight.netconf.shaded.sshd.common.util.buffer.Buffer;
+import org.opendaylight.netconf.shaded.sshd.common.util.buffer.ByteArrayBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -29,9 +31,8 @@ import org.slf4j.LoggerFactory;
  * Also handles pending writes by caching requests until pending state is over.
  */
 public final class AsyncSshHandlerWriter implements AutoCloseable {
-
-    private static final Logger LOG = LoggerFactory
-            .getLogger(AsyncSshHandlerWriter.class);
+    private static final Logger LOG = LoggerFactory.getLogger(AsyncSshHandlerWriter.class);
+    private static final Pattern NON_ASCII = Pattern.compile("([^\\x20-\\x7E\\x0D\\x0A])+");
 
     // public static final int MAX_PENDING_WRITES = 1000;
     // TODO implement Limiting mechanism for pending writes
@@ -46,12 +47,14 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
     // Order has to be preserved for queued writes
     private final Deque<PendingWriteRequest> pending = new LinkedList<>();
 
+    @GuardedBy("asyncInLock")
+    private boolean isWriteExecuted = false;
+
     public AsyncSshHandlerWriter(final IoOutputStream asyncIn) {
         this.asyncIn = asyncIn;
     }
 
-    public void write(final ChannelHandlerContext ctx,
-            final Object msg, final ChannelPromise promise) {
+    public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
         if (asyncIn == null) {
             promise.setFailure(new IllegalStateException("Channel closed"));
             return;
@@ -67,7 +70,7 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
                 promise.setFailure(new IllegalStateException("Channel closed"));
             } else {
                 final ByteBuf byteBufMsg = (ByteBuf) msg;
-                if (!pending.isEmpty()) {
+                if (isWriteExecuted) {
                     queueRequest(ctx, byteBufMsg, promise);
                     return;
                 }
@@ -86,49 +89,47 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg));
             }
-            asyncIn.write(toBuffer(byteBufMsg)).addListener(new SshFutureListener<IoWriteFuture>() {
-
-                @Override
-                public void operationComplete(final IoWriteFuture future) {
-                    // synchronized block due to deadlock that happens on ssh window resize
-                    // writes and pending writes would lock the underlyinch channel session
-                    // window resize write would try to write the message on an already locked channelSession,
-                    // while the pending write was in progress from the write callback
-                    synchronized (asyncInLock) {
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace(
-                                "Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}",
-                                ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg));
-                        }
-
-                        // Notify success or failure
-                        if (future.isWritten()) {
-                            promise.setSuccess();
-                        } else {
-                            LOG.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(),
-                                    byteBufToString(byteBufMsg), future.getException());
-                            promise.setFailure(future.getException());
-                        }
-
-                        // Not needed anymore, release
-                        byteBufMsg.release();
-
-                        //rescheduling message from queue after successfully sent
-                        if (wasPending) {
-                            byteBufMsg.resetReaderIndex();
-                            pending.remove();
-                        }
+
+            isWriteExecuted = true;
+
+            asyncIn.writeBuffer(toBuffer(byteBufMsg)).addListener(future -> {
+                // synchronized block due to deadlock that happens on ssh window resize
+                // writes and pending writes would lock the underlyinch channel session
+                // window resize write would try to write the message on an already locked channelSession,
+                // while the pending write was in progress from the write callback
+                synchronized (asyncInLock) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace(
+                            "Ssh write request finished on channel: {} with result: {}: and ex:{}, message: {}",
+                            ctx.channel(), future.isWritten(), future.getException(), byteBufToString(byteBufMsg));
+                    }
+
+                    // Notify success or failure
+                    if (future.isWritten()) {
+                        promise.setSuccess();
+                    } else {
+                        LOG.warn("Ssh write request failed on channel: {} for message: {}", ctx.channel(),
+                                byteBufToString(byteBufMsg), future.getException());
+                        promise.setFailure(future.getException());
+                    }
+
+                    //rescheduling message from queue after successfully sent
+                    if (wasPending) {
+                        byteBufMsg.resetReaderIndex();
+                        pending.remove();
                     }
 
-                    // Check pending queue and schedule next
-                    // At this time we are guaranteed that we are not in pending state anymore
-                    // so the next request should succeed
-                    writePendingIfAny();
+                    // Not needed anymore, release
+                    byteBufMsg.release();
                 }
-            });
 
-        } catch (final WritePendingException e) {
+                // Check pending queue and schedule next
+                // At this time we are guaranteed that we are not in pending state anymore
+                // so the next request should succeed
+                writePendingIfAny();
+            });
 
+        } catch (final IOException | WritePendingException e) {
             if (!wasPending) {
                 queueRequest(ctx, byteBufMsg, promise);
             }
@@ -137,11 +138,12 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
 
     private void writePendingIfAny() {
         synchronized (asyncInLock) {
-            if (pending.peek() == null) {
+            final PendingWriteRequest pendingWrite = pending.peek();
+            if (pendingWrite == null) {
+                isWriteExecuted = false;
                 return;
             }
 
-            final PendingWriteRequest pendingWrite = pending.peek();
             final ByteBuf msg = pendingWrite.msg;
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Writing pending request on channel: {}, message: {}",
@@ -153,18 +155,33 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
     }
 
     public static String byteBufToString(final ByteBuf msg) {
-        final String s = msg.toString(StandardCharsets.UTF_8);
+        final String message = msg.toString(StandardCharsets.UTF_8);
         msg.resetReaderIndex();
-        return s;
+        Matcher matcher = NON_ASCII.matcher(message);
+        return matcher.replaceAll(data -> {
+            StringBuilder buf = new StringBuilder();
+            buf.append("\"");
+            for (byte b : data.group().getBytes(StandardCharsets.US_ASCII)) {
+                buf.append(String.format("%02X", b));
+            }
+            buf.append("\"");
+            return buf.toString();
+        });
     }
 
     private void queueRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) {
-//        try {
         LOG.debug("Write pending on channel: {}, queueing, current queue size: {}", ctx.channel(), pending.size());
         if (LOG.isTraceEnabled()) {
             LOG.trace("Queueing request due to pending: {}", byteBufToString(msg));
         }
-        new PendingWriteRequest(ctx, msg, promise).pend(pending);
+
+//      try {
+        final var req = new PendingWriteRequest(ctx, msg, promise);
+        // Preconditions.checkState(pending.size() < MAX_PENDING_WRITES,
+        // "Too much pending writes(%s) on channel: %s, remote window is not getting read or is too small",
+        // pending.size(), ctx.channel());
+        checkState(pending.offer(req), "Cannot pend another request write (pending count: %s) on channel: %s",
+                pending.size(), ctx.channel());
 //        } catch (final Exception ex) {
 //            LOG.warn("Unable to queue write request on channel: {}. Setting fail for the request: {}",
 //                    ctx.channel(), ex, byteBufToString(msg));
@@ -183,7 +200,7 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
         msg.resetReaderIndex();
         final byte[] temp = new byte[msg.readableBytes()];
         msg.readBytes(temp, 0, msg.readableBytes());
-        return new Buffer(temp);
+        return new ByteArrayBuffer(temp);
     }
 
     private static final class PendingWriteRequest {
@@ -198,13 +215,5 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
             this.msg = msg;
             this.promise = promise;
         }
-
-        public void pend(final Queue<PendingWriteRequest> pending) {
-            // Preconditions.checkState(pending.size() < MAX_PENDING_WRITES,
-            // "Too much pending writes(%s) on channel: %s, remote window is not getting read or is too small",
-            // pending.size(), ctx.channel());
-            Preconditions.checkState(pending.offer(this),
-                "Cannot pend another request write (pending count: %s) on channel: %s", pending.size(), ctx.channel());
-        }
     }
 }