2 * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.transport.ssh;
10 import com.google.errorprone.annotations.DoNotCall;
11 import io.netty.buffer.Unpooled;
12 import io.netty.channel.ChannelHandlerContext;
13 import io.netty.channel.ChannelInboundHandlerAdapter;
14 import java.io.IOException;
15 import org.opendaylight.netconf.shaded.sshd.client.channel.ChannelSubsystem;
16 import org.opendaylight.netconf.shaded.sshd.client.future.OpenFuture;
17 import org.opendaylight.netconf.transport.api.TransportChannel;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
22 * A {@link ChannelSubsystem} bound to a {@link SSHClient} and a Netty channel.
24 final class TransportClientSubsystem extends ChannelSubsystem {
25 private static final Logger LOG = LoggerFactory.getLogger(TransportClientSubsystem.class);
27 private ChannelHandlerContext pipelineHead;
29 TransportClientSubsystem(final String subsystem) {
31 setStreaming(Streaming.Async);
36 @DoNotCall("Always throws UnsupportedOperationException")
37 public OpenFuture open() throws IOException {
38 throw new UnsupportedOperationException();
41 synchronized OpenFuture open(final TransportChannel underlay) throws IOException {
42 LOG.debug("Opening client subsystem \"{}\"", getSubsystem());
43 final var openFuture = super.open();
44 openFuture.addListener(future -> onOpenComplete(future, underlay));
48 private void onOpenComplete(final OpenFuture future, final TransportChannel underlay) {
49 if (!future.isOpened()) {
50 LOG.debug("Failed to open client subsystem \"{}\"", getSubsystem(), future.getException());
54 // Note that there may be multiple handlers already present on the channel, hence we are attaching last, but
55 // from the logical perspective we are the head handlers.
56 final var pipeline = underlay.channel().pipeline();
58 // - install outbound packet handler, i.e. moving bytes from the channel into SSHD's pipeline
59 pipeline.addLast(new OutboundChannelHandler(getAsyncIn()));
60 // - remember the context of this handler, we will be using it to issue writes into the channel
61 pipelineHead = pipeline.lastContext();
63 // - install inner channel termination handler
64 pipeline.addLast(new ChannelInboundHandlerAdapter() {
66 public void channelInactive(final ChannelHandlerContext ctx) throws IOException {
73 protected void doWriteData(final byte[] data, final int off, final long len) throws IOException {
74 // If we're already closing, ignore incoming data
79 final int reqLen = (int) len;
81 LOG.debug("Forwarding {} bytes of data", reqLen);
82 pipelineHead.fireChannelRead(Unpooled.copiedBuffer(data, off, reqLen));
83 getLocalWindow().release(reqLen);
88 protected void doWriteExtendedData(final byte[] data, final int off, final long len) throws IOException {
89 // If we're already closing, ignore incoming data
93 LOG.debug("Discarding {} bytes of extended data", len);
95 getLocalWindow().release(len);