/* * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.netconf.transport.ssh; import static java.util.Objects.requireNonNull; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandler; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; import java.io.IOException; import java.util.ArrayDeque; import org.opendaylight.netconf.shaded.sshd.common.channel.ChannelAsyncOutputStream; import org.opendaylight.netconf.shaded.sshd.common.io.IoOutputStream; import org.opendaylight.netconf.shaded.sshd.common.io.IoWriteFuture; import org.opendaylight.netconf.shaded.sshd.common.util.buffer.ByteArrayBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * A {@link ChannelOutboundHandler} responsible for redirecting whatever bytes need to be written out on the Netty * channel so that they pass into SSHD's output. * *

* This class is specialized for {@link ChannelAsyncOutputStream} on purpose, as this handler is invoked from the Netty * thread and we do not want to block those. We therefore rely on {@link ChannelAsyncOutputStream}'s single-async-write * promise and perform queueing here. */ final class OutboundChannelHandler extends ChannelOutboundHandlerAdapter { // A write enqueued in pending queue private record Write(ByteBuf buf, ChannelPromise promise) { Write { requireNonNull(buf); requireNonNull(promise); } } private static final Logger LOG = LoggerFactory.getLogger(OutboundChannelHandler.class); private final IoOutputStream out; // write requests that need to be sent once currently-outstanding write completes private ArrayDeque pending; // indicates we have an outstanding write private boolean writePending; OutboundChannelHandler(final ChannelAsyncOutputStream out) { this.out = requireNonNull(out); } @Override public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) { // redirect channel outgoing packets to output stream linked to transport if (msg instanceof ByteBuf buf) { write(buf, promise); } else { LOG.trace("Ignoring unrecognized {}", msg == null ? null : msg.getClass()); } } private void write(final ByteBuf buf, final ChannelPromise promise) { if (writePending) { LOG.trace("A write is already pending, delaying write"); delayWrite(buf, promise); } else { LOG.trace("Issuing immediate write"); startWrite(buf, promise); } } private void delayWrite(final ByteBuf buf, final ChannelPromise promise) { if (pending == null) { // these are per-session, hence we want to start out small pending = new ArrayDeque<>(1); } pending.addLast(new Write(buf, promise)); } private void startWrite(final ByteBuf buf, final ChannelPromise promise) { final var sshBuf = toSshBuffer(buf); final IoWriteFuture writeFuture; try { writeFuture = out.writeBuffer(sshBuf); } catch (IOException e) { failWrites(promise, e); return; } writePending = true; writeFuture.addListener(future -> finishWrite(future, promise)); } private void finishWrite(final IoWriteFuture future, final ChannelPromise promise) { writePending = false; if (future.isWritten()) { // report outbound message being handled promise.setSuccess(); if (pending != null) { // TODO: here we could be coalescing multiple ByteBufs into a single Buffer final var next = pending.pollFirst(); if (next != null) { LOG.trace("Issuing next write"); startWrite(next.buf, next.promise); } } return; } final var cause = future.getException(); if (cause != null) { failWrites(promise, cause); } } private void failWrites(final ChannelPromise promise, final Throwable cause) { LOG.error("Error writing buffer", cause); promise.setFailure(cause); // Cascade to all delayed messages if (pending != null) { pending.forEach(msg -> msg.promise.setFailure(cause)); pending = null; } } // TODO: This can amount to a lot of copying around. Is it worth our while to create a ByteBufBuffer, which // would implement Buffer API on top a ByteBuf? // If we decide to do that, we need to decide to interface with ByteBuf (readRetainedSlice() ?) and then // release it only after the write has been resolved private static ByteArrayBuffer toSshBuffer(final ByteBuf byteBuf) { final var bytes = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(bytes); // Netty buffer can be recycled now byteBuf.release(); return new ByteArrayBuffer(bytes); } }