Improve write error handling 75/108375/3
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 12 Oct 2023 15:45:50 +0000 (17:45 +0200)
committerRobert Varga <nite@hq.sk>
Thu, 12 Oct 2023 20:42:49 +0000 (20:42 +0000)
We are swallowing errors if the attempt to write out fails. We also fail
to release the buffer in error paths.

Refactor the implementation to:
- release ByteBuf as soon as we have extracted bytes from it
- propagate failure to the promise
- emit a trace message if we encounter a non-ByteBuf

Also add a TODO to consider creating a ByteBufBuffer, which might reduce
the need copy bytes around.

JIRA: NETCONF-1106
Change-Id: I1da71b1f5c9959a5a32a9ce39067d25464b785c2
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
protocol/netconf-server/src/main/java/org/opendaylight/netconf/server/NetconfSubsystemOutboundChannelHandler.java

index e723e7c7dc4ad383b75d695dea149f18669ed347..c419ebd82e8a83fdf4f3d54d0264e18491000535 100644 (file)
@@ -15,6 +15,7 @@ import io.netty.channel.ChannelOutboundHandlerAdapter;
 import io.netty.channel.ChannelPromise;
 import java.io.IOException;
 import org.opendaylight.netconf.shaded.sshd.common.io.IoOutputStream;
+import org.opendaylight.netconf.shaded.sshd.common.io.IoWriteFuture;
 import org.opendaylight.netconf.shaded.sshd.common.util.buffer.ByteArrayBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,24 +35,42 @@ final class NetconfSubsystemOutboundChannelHandler extends ChannelOutboundHandle
 
     @Override
     public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
-        if (msg instanceof ByteBuf byteBuf) {
-            // redirect channel outgoing packets to output stream linked to transport
-            final byte[] bytes = new byte[byteBuf.readableBytes()];
-            byteBuf.readBytes(bytes);
-            try {
-                out.writeBuffer(new ByteArrayBuffer(bytes))
-                    .addListener(future -> {
-                        if (future.isWritten()) {
-                            byteBuf.release(); // report outbound message being handled
-                            promise.setSuccess();
-                        } else if (future.getException() != null) {
-                            LOG.error("Error writing buffer", future.getException());
-                            promise.setFailure(future.getException());
-                        }
-                    });
-            } catch (IOException e) {
-                LOG.error("Error writing buffer", e);
-            }
+        // redirect channel outgoing packets to output stream linked to transport
+        if (!(msg instanceof ByteBuf byteBuf)) {
+            LOG.trace("Ignoring unrecognized {}", msg == null ? null : msg.getClass());
+            return;
+        }
+
+        final var sshBuf = toSshBuffer(byteBuf);
+        final IoWriteFuture writeFuture;
+        try {
+            writeFuture = out.writeBuffer(sshBuf);
+        } catch (IOException e) {
+            LOG.error("Error writing buffer", e);
+            promise.setFailure(e);
+            return;
         }
+
+        writeFuture.addListener(future -> {
+            if (future.isWritten()) {
+                // report outbound message being handled
+                promise.setSuccess();
+            } else if (future.getException() != null) {
+                LOG.error("Error writing buffer", future.getException());
+                promise.setFailure(future.getException());
+            }
+        });
+    }
+
+    // TODO: This can amount to a lot of copying around. Is it worth our while to create a ByteBufBuffer, which
+    //       would implement Buffer API on top a ByteBuf?
+    //       If we decide to do that, we need to decide to interface with ByteBuf (readRetainedSlice() ?) and then
+    //       release it only after the write has been resolved
+    private static ByteArrayBuffer toSshBuffer(final ByteBuf byteBuf) {
+        final var bytes = new byte[byteBuf.readableBytes()];
+        byteBuf.readBytes(bytes);
+        // Netty buffer can be recycled now
+        byteBuf.release();
+        return new ByteArrayBuffer(bytes);
     }
 }