From 11fccce495de1e788ad21c203b3d1a4a4ad38be5 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Thu, 12 Oct 2023 17:45:50 +0200 Subject: [PATCH] Improve write error handling We are swallowing errors if the attempt to write out fails. We also fail to release the buffer in error paths. Refactor the implementation to: - release ByteBuf as soon as we have extracted bytes from it - propagate failure to the promise - emit a trace message if we encounter a non-ByteBuf Also add a TODO to consider creating a ByteBufBuffer, which might reduce the need copy bytes around. JIRA: NETCONF-1106 Change-Id: I1da71b1f5c9959a5a32a9ce39067d25464b785c2 Signed-off-by: Robert Varga --- ...etconfSubsystemOutboundChannelHandler.java | 55 +++++++++++++------ 1 file changed, 37 insertions(+), 18 deletions(-) diff --git a/protocol/netconf-server/src/main/java/org/opendaylight/netconf/server/NetconfSubsystemOutboundChannelHandler.java b/protocol/netconf-server/src/main/java/org/opendaylight/netconf/server/NetconfSubsystemOutboundChannelHandler.java index e723e7c7dc..c419ebd82e 100644 --- a/protocol/netconf-server/src/main/java/org/opendaylight/netconf/server/NetconfSubsystemOutboundChannelHandler.java +++ b/protocol/netconf-server/src/main/java/org/opendaylight/netconf/server/NetconfSubsystemOutboundChannelHandler.java @@ -15,6 +15,7 @@ import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; import java.io.IOException; import org.opendaylight.netconf.shaded.sshd.common.io.IoOutputStream; +import org.opendaylight.netconf.shaded.sshd.common.io.IoWriteFuture; import org.opendaylight.netconf.shaded.sshd.common.util.buffer.ByteArrayBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,24 +35,42 @@ final class NetconfSubsystemOutboundChannelHandler extends ChannelOutboundHandle @Override public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) { - if (msg instanceof ByteBuf byteBuf) { - // redirect channel outgoing packets to output stream linked to transport - final byte[] bytes = new byte[byteBuf.readableBytes()]; - byteBuf.readBytes(bytes); - try { - out.writeBuffer(new ByteArrayBuffer(bytes)) - .addListener(future -> { - if (future.isWritten()) { - byteBuf.release(); // report outbound message being handled - promise.setSuccess(); - } else if (future.getException() != null) { - LOG.error("Error writing buffer", future.getException()); - promise.setFailure(future.getException()); - } - }); - } catch (IOException e) { - LOG.error("Error writing buffer", e); - } + // redirect channel outgoing packets to output stream linked to transport + if (!(msg instanceof ByteBuf byteBuf)) { + LOG.trace("Ignoring unrecognized {}", msg == null ? null : msg.getClass()); + return; + } + + final var sshBuf = toSshBuffer(byteBuf); + final IoWriteFuture writeFuture; + try { + writeFuture = out.writeBuffer(sshBuf); + } catch (IOException e) { + LOG.error("Error writing buffer", e); + promise.setFailure(e); + return; } + + writeFuture.addListener(future -> { + if (future.isWritten()) { + // report outbound message being handled + promise.setSuccess(); + } else if (future.getException() != null) { + LOG.error("Error writing buffer", future.getException()); + promise.setFailure(future.getException()); + } + }); + } + + // TODO: This can amount to a lot of copying around. Is it worth our while to create a ByteBufBuffer, which + // would implement Buffer API on top a ByteBuf? + // If we decide to do that, we need to decide to interface with ByteBuf (readRetainedSlice() ?) and then + // release it only after the write has been resolved + private static ByteArrayBuffer toSshBuffer(final ByteBuf byteBuf) { + final var bytes = new byte[byteBuf.readableBytes()]; + byteBuf.readBytes(bytes); + // Netty buffer can be recycled now + byteBuf.release(); + return new ByteArrayBuffer(bytes); } } -- 2.36.6