cceef609b0d59a6311de828676621a39ced6ba2b
[netconf.git] / transport / transport-ssh / src / main / java / org / opendaylight / netconf / transport / ssh / OutboundChannelHandler.java
1 /*
2  * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.transport.ssh;
9
10 import static java.util.Objects.requireNonNull;
11
12 import io.netty.buffer.ByteBuf;
13 import io.netty.channel.ChannelHandlerContext;
14 import io.netty.channel.ChannelOutboundHandler;
15 import io.netty.channel.ChannelOutboundHandlerAdapter;
16 import io.netty.channel.ChannelPromise;
17 import java.io.IOException;
18 import java.util.ArrayDeque;
19 import org.opendaylight.netconf.shaded.sshd.common.channel.ChannelAsyncOutputStream;
20 import org.opendaylight.netconf.shaded.sshd.common.io.IoOutputStream;
21 import org.opendaylight.netconf.shaded.sshd.common.io.IoWriteFuture;
22 import org.opendaylight.netconf.shaded.sshd.common.util.buffer.ByteArrayBuffer;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 /**
27  * A {@link ChannelOutboundHandler} responsible for redirecting whatever bytes need to be written out on the Netty
28  * channel so that they pass into SSHD's output.
29  *
30  * <p>
31  * This class is specialized for {@link ChannelAsyncOutputStream} on purpose, as this handler is invoked from the Netty
32  * thread and we do not want to block those. We therefore rely on {@link ChannelAsyncOutputStream}'s single-async-write
33  * promise and perform queueing here.
34  */
35 final class OutboundChannelHandler extends ChannelOutboundHandlerAdapter {
36     // A write enqueued in pending queue
37     private record Write(ByteBuf buf, ChannelPromise promise) {
38         Write {
39             requireNonNull(buf);
40             requireNonNull(promise);
41         }
42     }
43
44     private static final Logger LOG = LoggerFactory.getLogger(OutboundChannelHandler.class);
45
46     private final IoOutputStream out;
47
48     // write requests that need to be sent once currently-outstanding write completes
49     private ArrayDeque<Write> pending;
50     // indicates we have an outstanding write
51     private boolean writePending;
52
53     OutboundChannelHandler(final ChannelAsyncOutputStream out) {
54         this.out = requireNonNull(out);
55     }
56
57     @Override
58     public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
59         // redirect channel outgoing packets to output stream linked to transport
60         if (msg instanceof ByteBuf buf) {
61             write(buf, promise);
62         } else {
63             LOG.trace("Ignoring unrecognized {}", msg == null ? null : msg.getClass());
64         }
65     }
66
67     private void write(final ByteBuf buf, final ChannelPromise promise) {
68         if (writePending) {
69             LOG.trace("A write is already pending, delaying write");
70             delayWrite(buf, promise);
71         } else {
72             LOG.trace("Issuing immediate write");
73             startWrite(buf, promise);
74         }
75     }
76
77     private void delayWrite(final ByteBuf buf, final ChannelPromise promise) {
78         if (pending == null) {
79             // these are per-session, hence we want to start out small
80             pending = new ArrayDeque<>(1);
81         }
82         pending.addLast(new Write(buf, promise));
83     }
84
85     private void startWrite(final ByteBuf buf, final ChannelPromise promise) {
86         final var sshBuf = toSshBuffer(buf);
87         final IoWriteFuture writeFuture;
88         try {
89             writeFuture = out.writeBuffer(sshBuf);
90         } catch (IOException e) {
91             failWrites(promise, e);
92             return;
93         }
94
95         writePending = true;
96         writeFuture.addListener(future -> finishWrite(future, promise));
97     }
98
99     private void finishWrite(final IoWriteFuture future, final ChannelPromise promise) {
100         writePending = false;
101
102         if (future.isWritten()) {
103             // report outbound message being handled
104             promise.setSuccess();
105
106             if (pending != null) {
107                 // TODO: here we could be coalescing multiple ByteBufs into a single Buffer
108                 final var next = pending.pollFirst();
109                 if (next != null) {
110                     LOG.trace("Issuing next write");
111                     startWrite(next.buf, next.promise);
112                 }
113             }
114             return;
115         }
116
117         final var cause = future.getException();
118         if (cause != null) {
119             failWrites(promise, cause);
120         }
121     }
122
123     private void failWrites(final ChannelPromise promise, final Throwable cause) {
124         LOG.error("Error writing buffer", cause);
125         promise.setFailure(cause);
126
127         // Cascade to all delayed messages
128         if (pending != null) {
129             pending.forEach(msg -> msg.promise.setFailure(cause));
130             pending = null;
131         }
132     }
133
134     // TODO: This can amount to a lot of copying around. Is it worth our while to create a ByteBufBuffer, which
135     //       would implement Buffer API on top a ByteBuf?
136     //       If we decide to do that, we need to decide to interface with ByteBuf (readRetainedSlice() ?) and then
137     //       release it only after the write has been resolved
138     private static ByteArrayBuffer toSshBuffer(final ByteBuf byteBuf) {
139         final var bytes = new byte[byteBuf.readableBytes()];
140         byteBuf.readBytes(bytes);
141         // Netty buffer can be recycled now
142         byteBuf.release();
143         return new ByteArrayBuffer(bytes);
144     }
145 }