Improve write dequeing 12/101412/1
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 1 Jun 2022 16:28:38 +0000 (18:28 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Wed, 1 Jun 2022 16:29:23 +0000 (18:29 +0200)
We should be peeking only once to see the actual result.

Change-Id: Idaa132178f58253dcafca7b2bb4d09d9823ee453
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/ssh/client/AsyncSshHandlerWriter.java

index 549bbe18cb96dc67e99bce2dff6134717f82601e..3477b0b9249b035ee5a85883eb281d83172e67a2 100644 (file)
@@ -5,10 +5,10 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.netconf.nettyutil.handler.ssh.client;
 
-import com.google.common.base.Preconditions;
+import static com.google.common.base.Preconditions.checkState;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
@@ -32,10 +32,7 @@ import org.slf4j.LoggerFactory;
  * Also handles pending writes by caching requests until pending state is over.
  */
 public final class AsyncSshHandlerWriter implements AutoCloseable {
-
-    private static final Logger LOG = LoggerFactory
-            .getLogger(AsyncSshHandlerWriter.class);
-
+    private static final Logger LOG = LoggerFactory.getLogger(AsyncSshHandlerWriter.class);
     private static final Pattern NON_ASCII = Pattern.compile("([^\\x20-\\x7E\\x0D\\x0A])+");
 
     // public static final int MAX_PENDING_WRITES = 1000;
@@ -51,15 +48,14 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
     // Order has to be preserved for queued writes
     private final Deque<PendingWriteRequest> pending = new LinkedList<>();
 
+    @GuardedBy("asyncInLock")
+    private boolean isWriteExecuted = false;
+
     public AsyncSshHandlerWriter(final IoOutputStream asyncIn) {
         this.asyncIn = asyncIn;
     }
 
-    @GuardedBy("asyncInLock")
-    private boolean isWriteExecuted = false;
-
-    public void write(final ChannelHandlerContext ctx,
-            final Object msg, final ChannelPromise promise) {
+    public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
         if (asyncIn == null) {
             promise.setFailure(new IllegalStateException("Channel closed"));
             return;
@@ -143,12 +139,12 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
 
     private void writePendingIfAny() {
         synchronized (asyncInLock) {
-            if (pending.peek() == null) {
+            final PendingWriteRequest pendingWrite = pending.peek();
+            if (pendingWrite == null) {
                 isWriteExecuted = false;
                 return;
             }
 
-            final PendingWriteRequest pendingWrite = pending.peek();
             final ByteBuf msg = pendingWrite.msg;
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Writing pending request on channel: {}, message: {}",
@@ -163,7 +159,7 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
         final String message = msg.toString(StandardCharsets.UTF_8);
         msg.resetReaderIndex();
         Matcher matcher = NON_ASCII.matcher(message);
-        return matcher.replaceAll((data) -> {
+        return matcher.replaceAll(data -> {
             StringBuilder buf = new StringBuilder();
             buf.append("\"");
             for (byte b : data.group().getBytes(StandardCharsets.US_ASCII)) {
@@ -215,12 +211,12 @@ public final class AsyncSshHandlerWriter implements AutoCloseable {
             this.promise = promise;
         }
 
-        public void pend(final Queue<PendingWriteRequest> pending) {
+        void pend(final Queue<PendingWriteRequest> pending) {
             // Preconditions.checkState(pending.size() < MAX_PENDING_WRITES,
             // "Too much pending writes(%s) on channel: %s, remote window is not getting read or is too small",
             // pending.size(), ctx.channel());
-            Preconditions.checkState(pending.offer(this),
-                "Cannot pend another request write (pending count: %s) on channel: %s", pending.size(), ctx.channel());
+            checkState(pending.offer(this), "Cannot pend another request write (pending count: %s) on channel: %s",
+                pending.size(), ctx.channel());
         }
     }
 }