From: Robert Varga Date: Wed, 1 Jun 2022 16:28:38 +0000 (+0200) Subject: Improve write dequeing X-Git-Tag: v4.0.0~60 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=netconf.git;a=commitdiff_plain;h=ff4374a839351458ef1598827ce34363f35efea2 Improve write dequeing We should be peeking only once to see the actual result. Change-Id: Idaa132178f58253dcafca7b2bb4d09d9823ee453 Signed-off-by: Robert Varga --- diff --git a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java index 549bbe18cb..3477b0b924 100644 --- a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java +++ b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java @@ -5,10 +5,10 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.netconf.nettyutil.handler.ssh.client; -import com.google.common.base.Preconditions; +import static com.google.common.base.Preconditions.checkState; + import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; @@ -32,10 +32,7 @@ import org.slf4j.LoggerFactory; * Also handles pending writes by caching requests until pending state is over. */ public final class AsyncSshHandlerWriter implements AutoCloseable { - - private static final Logger LOG = LoggerFactory - .getLogger(AsyncSshHandlerWriter.class); - + private static final Logger LOG = LoggerFactory.getLogger(AsyncSshHandlerWriter.class); private static final Pattern NON_ASCII = Pattern.compile("([^\\x20-\\x7E\\x0D\\x0A])+"); // public static final int MAX_PENDING_WRITES = 1000; @@ -51,15 +48,14 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { // Order has to be preserved for queued writes private final Deque pending = new LinkedList<>(); + @GuardedBy("asyncInLock") + private boolean isWriteExecuted = false; + public AsyncSshHandlerWriter(final IoOutputStream asyncIn) { this.asyncIn = asyncIn; } - @GuardedBy("asyncInLock") - private boolean isWriteExecuted = false; - - public void write(final ChannelHandlerContext ctx, - final Object msg, final ChannelPromise promise) { + public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) { if (asyncIn == null) { promise.setFailure(new IllegalStateException("Channel closed")); return; @@ -143,12 +139,12 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { private void writePendingIfAny() { synchronized (asyncInLock) { - if (pending.peek() == null) { + final PendingWriteRequest pendingWrite = pending.peek(); + if (pendingWrite == null) { isWriteExecuted = false; return; } - final PendingWriteRequest pendingWrite = pending.peek(); final ByteBuf msg = pendingWrite.msg; if (LOG.isTraceEnabled()) { LOG.trace("Writing pending request on channel: {}, message: {}", @@ -163,7 +159,7 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { final String message = msg.toString(StandardCharsets.UTF_8); msg.resetReaderIndex(); Matcher matcher = NON_ASCII.matcher(message); - return matcher.replaceAll((data) -> { + return matcher.replaceAll(data -> { StringBuilder buf = new StringBuilder(); buf.append("\""); for (byte b : data.group().getBytes(StandardCharsets.US_ASCII)) { @@ -215,12 +211,12 @@ public final class AsyncSshHandlerWriter implements AutoCloseable { this.promise = promise; } - public void pend(final Queue pending) { + void pend(final Queue pending) { // Preconditions.checkState(pending.size() < MAX_PENDING_WRITES, // "Too much pending writes(%s) on channel: %s, remote window is not getting read or is too small", // pending.size(), ctx.channel()); - Preconditions.checkState(pending.offer(this), - "Cannot pend another request write (pending count: %s) on channel: %s", pending.size(), ctx.channel()); + checkState(pending.offer(this), "Cannot pend another request write (pending count: %s) on channel: %s", + pending.size(), ctx.channel()); } } }