2 * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.transport.ssh;
10 import static java.util.Objects.requireNonNull;
12 import io.netty.buffer.ByteBuf;
13 import io.netty.channel.ChannelHandlerContext;
14 import io.netty.channel.ChannelOutboundHandler;
15 import io.netty.channel.ChannelOutboundHandlerAdapter;
16 import io.netty.channel.ChannelPromise;
17 import java.io.IOException;
18 import java.util.ArrayDeque;
19 import org.opendaylight.netconf.shaded.sshd.common.channel.ChannelAsyncOutputStream;
20 import org.opendaylight.netconf.shaded.sshd.common.io.IoOutputStream;
21 import org.opendaylight.netconf.shaded.sshd.common.io.IoWriteFuture;
22 import org.opendaylight.netconf.shaded.sshd.common.util.buffer.ByteArrayBuffer;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
27 * A {@link ChannelOutboundHandler} responsible for redirecting whatever bytes need to be written out on the Netty
28 * channel so that they pass into SSHD's output.
31 * This class is specialized for {@link ChannelAsyncOutputStream} on purpose, as this handler is invoked from the Netty
32 * thread and we do not want to block those. We therefore rely on {@link ChannelAsyncOutputStream}'s single-async-write
33 * promise and perform queueing here.
35 final class OutboundChannelHandler extends ChannelOutboundHandlerAdapter {
36 // A write enqueued in pending queue
37 private record Write(ByteBuf buf, ChannelPromise promise) {
40 requireNonNull(promise);
44 private static final Logger LOG = LoggerFactory.getLogger(OutboundChannelHandler.class);
46 private final IoOutputStream out;
48 // write requests that need to be sent once currently-outstanding write completes
49 private ArrayDeque<Write> pending;
50 // indicates we have an outstanding write
51 private boolean writePending;
53 OutboundChannelHandler(final ChannelAsyncOutputStream out) {
54 this.out = requireNonNull(out);
58 public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
59 // redirect channel outgoing packets to output stream linked to transport
60 if (msg instanceof ByteBuf buf) {
63 LOG.trace("Ignoring unrecognized {}", msg == null ? null : msg.getClass());
67 private void write(final ByteBuf buf, final ChannelPromise promise) {
69 LOG.trace("A write is already pending, delaying write");
70 delayWrite(buf, promise);
72 LOG.trace("Issuing immediate write");
73 startWrite(buf, promise);
77 private void delayWrite(final ByteBuf buf, final ChannelPromise promise) {
78 if (pending == null) {
79 // these are per-session, hence we want to start out small
80 pending = new ArrayDeque<>(1);
82 pending.addLast(new Write(buf, promise));
85 private void startWrite(final ByteBuf buf, final ChannelPromise promise) {
86 final var sshBuf = toSshBuffer(buf);
87 final IoWriteFuture writeFuture;
89 writeFuture = out.writeBuffer(sshBuf);
90 } catch (IOException e) {
91 failWrites(promise, e);
96 writeFuture.addListener(future -> finishWrite(future, promise));
99 private void finishWrite(final IoWriteFuture future, final ChannelPromise promise) {
100 writePending = false;
102 if (future.isWritten()) {
103 // report outbound message being handled
104 promise.setSuccess();
106 if (pending != null) {
107 // TODO: here we could be coalescing multiple ByteBufs into a single Buffer
108 final var next = pending.pollFirst();
110 LOG.trace("Issuing next write");
111 startWrite(next.buf, next.promise);
117 final var cause = future.getException();
119 failWrites(promise, cause);
123 private void failWrites(final ChannelPromise promise, final Throwable cause) {
124 LOG.error("Error writing buffer", cause);
125 promise.setFailure(cause);
127 // Cascade to all delayed messages
128 if (pending != null) {
129 pending.forEach(msg -> msg.promise.setFailure(cause));
134 // TODO: This can amount to a lot of copying around. Is it worth our while to create a ByteBufBuffer, which
135 // would implement Buffer API on top a ByteBuf?
136 // If we decide to do that, we need to decide to interface with ByteBuf (readRetainedSlice() ?) and then
137 // release it only after the write has been resolved
138 private static ByteArrayBuffer toSshBuffer(final ByteBuf byteBuf) {
139 final var bytes = new byte[byteBuf.readableBytes()];
140 byteBuf.readBytes(bytes);
141 // Netty buffer can be recycled now
143 return new ByteArrayBuffer(bytes);