Require a subsystem for client connections
[netconf.git] / transport / transport-ssh / src / main / java / org / opendaylight / netconf / transport / ssh / TransportClientSubsystem.java
1 /*
2  * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.transport.ssh;
9
10 import com.google.errorprone.annotations.DoNotCall;
11 import io.netty.buffer.Unpooled;
12 import io.netty.channel.ChannelHandlerContext;
13 import io.netty.channel.ChannelInboundHandlerAdapter;
14 import java.io.IOException;
15 import org.opendaylight.netconf.shaded.sshd.client.channel.ChannelSubsystem;
16 import org.opendaylight.netconf.shaded.sshd.client.future.OpenFuture;
17 import org.opendaylight.netconf.transport.api.TransportChannel;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20
21 /**
22  * A {@link ChannelSubsystem} bound to a {@link SSHClient} and a Netty channel.
23  */
24 final class TransportClientSubsystem extends ChannelSubsystem {
25     private static final Logger LOG = LoggerFactory.getLogger(TransportClientSubsystem.class);
26
27     private ChannelHandlerContext pipelineHead;
28
29     TransportClientSubsystem(final String subsystem) {
30         super(subsystem);
31         setStreaming(Streaming.Async);
32     }
33
34     @Override
35     @Deprecated
36     @DoNotCall("Always throws UnsupportedOperationException")
37     public OpenFuture open() throws IOException {
38         throw new UnsupportedOperationException();
39     }
40
41     synchronized OpenFuture open(final TransportChannel underlay) throws IOException {
42         LOG.debug("Opening client subsystem \"{}\"", getSubsystem());
43         final var openFuture = super.open();
44         openFuture.addListener(future -> onOpenComplete(future, underlay));
45         return openFuture;
46     }
47
48     private void onOpenComplete(final OpenFuture future, final TransportChannel underlay) {
49         if (!future.isOpened()) {
50             LOG.debug("Failed to open client subsystem \"{}\"", getSubsystem(), future.getException());
51             return;
52         }
53
54         // Note that there may be multiple handlers already present on the channel, hence we are attaching last, but
55         // from the logical perspective we are the head handlers.
56         final var pipeline = underlay.channel().pipeline();
57
58         // - install outbound packet handler, i.e. moving bytes from the channel into SSHD's pipeline
59         pipeline.addLast(new OutboundChannelHandler(getAsyncIn()));
60         // - remember the context of this handler, we will be using it to issue writes into the channel
61         pipelineHead = pipeline.lastContext();
62
63         // - install inner channel termination handler
64         pipeline.addLast(new ChannelInboundHandlerAdapter() {
65             @Override
66             public void channelInactive(final ChannelHandlerContext ctx) throws IOException {
67                 close();
68             }
69         });
70     }
71
72     @Override
73     protected void doWriteData(final byte[] data, final int off, final long len) throws IOException {
74         // If we're already closing, ignore incoming data
75         if (isClosing()) {
76             return;
77         }
78
79         final int reqLen = (int) len;
80         if (reqLen > 0) {
81             LOG.debug("Forwarding {} bytes of data", reqLen);
82             pipelineHead.fireChannelRead(Unpooled.copiedBuffer(data, off, reqLen));
83             getLocalWindow().release(reqLen);
84         }
85     }
86
87     @Override
88     protected void doWriteExtendedData(final byte[] data, final int off, final long len) throws IOException {
89         // If we're already closing, ignore incoming data
90         if (isClosing()) {
91             return;
92         }
93         LOG.debug("Discarding {} bytes of extended data", len);
94         if (len > 0) {
95             getLocalWindow().release(len);
96         }
97     }
98 }