X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Fnetconf-netty-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Fnettyutil%2Fhandler%2Fssh%2Fclient%2FAsyncSshHandlerWriter.java;h=d13d45a238fcf2f618069ea3ea945bee72e1faef;hb=a80dddbb6eed5e87a4aa4917ce35c6ed94a53087;hp=bab4a7be632ffd58b98bb0208c8096bdffd5d4a5;hpb=2295d50e7212d80a9bc752f655fa66790ad45022;p=netconf.git diff --git a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java index bab4a7be63..d13d45a238 100644 --- a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java +++ b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java @@ -5,21 +5,24 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.netconf.nettyutil.handler.ssh.client; -import com.google.common.base.Preconditions; +import static com.google.common.base.Preconditions.checkState; + import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Deque; import java.util.LinkedList; -import java.util.Queue; -import org.apache.sshd.common.io.IoOutputStream; -import org.apache.sshd.common.io.WritePendingException; -import org.apache.sshd.common.util.buffer.Buffer; -import org.apache.sshd.common.util.buffer.ByteArrayBuffer; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.checkerframework.checker.lock.qual.GuardedBy; +import org.opendaylight.netconf.shaded.sshd.common.io.IoOutputStream; +import org.opendaylight.netconf.shaded.sshd.common.io.WritePendingException; +import org.opendaylight.netconf.shaded.sshd.common.util.buffer.Buffer; +import org.opendaylight.netconf.shaded.sshd.common.util.buffer.ByteArrayBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,9 +31,8 @@ import org.slf4j.LoggerFactory; * Also handles pending writes by caching requests until pending state is over. */ public final class AsyncSshHandlerWriter implements AutoCloseable { - - private static final Logger LOG = LoggerFactory - .getLogger(AsyncSshHandlerWriter.class); + private static final Logger LOG = LoggerFactory.getLogger(AsyncSshHandlerWriter.class); + private static final Pattern NON_ASCII = Pattern.compile("([^\\x20-\\x7E\\x0D\\x0A])+"); // public static final int MAX_PENDING_WRITES = 1000; // TODO implement Limiting mechanism for pending writes @@ -45,12 +47,14 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { // Order has to be preserved for queued writes private final Deque pending = new LinkedList<>(); + @GuardedBy("asyncInLock") + private boolean isWriteExecuted = false; + public AsyncSshHandlerWriter(final IoOutputStream asyncIn) { this.asyncIn = asyncIn; } - public void write(final ChannelHandlerContext ctx, - final Object msg, final ChannelPromise promise) { + public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) { if (asyncIn == null) { promise.setFailure(new IllegalStateException("Channel closed")); return; @@ -66,7 +70,7 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { promise.setFailure(new IllegalStateException("Channel closed")); } else { final ByteBuf byteBufMsg = (ByteBuf) msg; - if (!pending.isEmpty()) { + if (isWriteExecuted) { queueRequest(ctx, byteBufMsg, promise); return; } @@ -85,7 +89,10 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { if (LOG.isTraceEnabled()) { LOG.trace("Writing request on channel: {}, message: {}", ctx.channel(), byteBufToString(byteBufMsg)); } - asyncIn.write(toBuffer(byteBufMsg)).addListener(future -> { + + isWriteExecuted = true; + + asyncIn.writeBuffer(toBuffer(byteBufMsg)).addListener(future -> { // synchronized block due to deadlock that happens on ssh window resize // writes and pending writes would lock the underlyinch channel session // window resize write would try to write the message on an already locked channelSession, @@ -106,14 +113,14 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { promise.setFailure(future.getException()); } - // Not needed anymore, release - byteBufMsg.release(); - //rescheduling message from queue after successfully sent if (wasPending) { byteBufMsg.resetReaderIndex(); pending.remove(); } + + // Not needed anymore, release + byteBufMsg.release(); } // Check pending queue and schedule next @@ -122,8 +129,7 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { writePendingIfAny(); }); - } catch (final WritePendingException e) { - + } catch (final IOException | WritePendingException e) { if (!wasPending) { queueRequest(ctx, byteBufMsg, promise); } @@ -132,11 +138,12 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { private void writePendingIfAny() { synchronized (asyncInLock) { - if (pending.peek() == null) { + final PendingWriteRequest pendingWrite = pending.peek(); + if (pendingWrite == null) { + isWriteExecuted = false; return; } - final PendingWriteRequest pendingWrite = pending.peek(); final ByteBuf msg = pendingWrite.msg; if (LOG.isTraceEnabled()) { LOG.trace("Writing pending request on channel: {}, message: {}", @@ -148,18 +155,33 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { } public static String byteBufToString(final ByteBuf msg) { - final String s = msg.toString(StandardCharsets.UTF_8); + final String message = msg.toString(StandardCharsets.UTF_8); msg.resetReaderIndex(); - return s; + Matcher matcher = NON_ASCII.matcher(message); + return matcher.replaceAll(data -> { + StringBuilder buf = new StringBuilder(); + buf.append("\""); + for (byte b : data.group().getBytes(StandardCharsets.US_ASCII)) { + buf.append(String.format("%02X", b)); + } + buf.append("\""); + return buf.toString(); + }); } private void queueRequest(final ChannelHandlerContext ctx, final ByteBuf msg, final ChannelPromise promise) { -// try { LOG.debug("Write pending on channel: {}, queueing, current queue size: {}", ctx.channel(), pending.size()); if (LOG.isTraceEnabled()) { LOG.trace("Queueing request due to pending: {}", byteBufToString(msg)); } - new PendingWriteRequest(ctx, msg, promise).pend(pending); + +// try { + final var req = new PendingWriteRequest(ctx, msg, promise); + // Preconditions.checkState(pending.size() < MAX_PENDING_WRITES, + // "Too much pending writes(%s) on channel: %s, remote window is not getting read or is too small", + // pending.size(), ctx.channel()); + checkState(pending.offer(req), "Cannot pend another request write (pending count: %s) on channel: %s", + pending.size(), ctx.channel()); // } catch (final Exception ex) { // LOG.warn("Unable to queue write request on channel: {}. Setting fail for the request: {}", // ctx.channel(), ex, byteBufToString(msg)); @@ -193,13 +215,5 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { this.msg = msg; this.promise = promise; } - - public void pend(final Queue pending) { - // Preconditions.checkState(pending.size() < MAX_PENDING_WRITES, - // "Too much pending writes(%s) on channel: %s, remote window is not getting read or is too small", - // pending.size(), ctx.channel()); - Preconditions.checkState(pending.offer(this), - "Cannot pend another request write (pending count: %s) on channel: %s", pending.size(), ctx.channel()); - } } }