import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
-import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.GlobalEventExecutor;
private static final Logger LOG = LoggerFactory.getLogger(NetconfSubsystem.class);
private final ServerChannelInitializer channelInitializer;
- private Channel innerChannel;
+ private EmbeddedChannel innerChannel;
private IoOutputStream ioOutputStream;
private ChannelSession channelSession;
* the inner channel is used to serve NETCONF over SSH.
*/
- final var embeddedChannel = new EmbeddedChannel() {
- @Override
- protected void handleOutboundMessage(final Object msg) {
- if (msg instanceof ByteBuf byteBuf) {
- // redirect channel outgoing packets to output stream linked to transport
- final byte[] bytes = new byte[byteBuf.readableBytes()];
- byteBuf.readBytes(bytes);
- try {
- ioOutputStream.writeBuffer(new ByteArrayBuffer(bytes))
- .addListener(future -> {
- if (future.isWritten()) {
- byteBuf.release(); // report outbound message being handled
- } else if (future.getException() != null) {
- LOG.debug("Error writing buffer", future.getException());
- }
- });
- } catch (IOException e) {
- LOG.error("Error writing buffer", e);
- }
- } else {
- // non-ByteBuf messages are persisted within channel for subsequent handling
- super.handleOutboundMessage(msg);
- }
- }
- };
-
- this.innerChannel = embeddedChannel;
+ this.innerChannel = new EmbeddedChannel();
// inbound packets handler
channelSession.setDataReceiver(new ChannelDataReceiver() {
@Override
public int data(ChannelSession channel, byte[] buf, int start, int len) throws IOException {
- embeddedChannel.writeInbound(Unpooled.copiedBuffer(buf, start, len));
+ innerChannel.writeInbound(Unpooled.copiedBuffer(buf, start, len));
return len;
}
@Override
public void close() throws IOException {
- embeddedChannel.close();
+ innerChannel.close();
}
});
+ // outbound packet handler, adding fist means it will be invoked last bc of flow direction
+ innerChannel.pipeline().addFirst(
+ new ChannelOutboundHandlerAdapter() {
+ @Override
+ public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
+ if (msg instanceof ByteBuf byteBuf) {
+ // redirect channel outgoing packets to output stream linked to transport
+ final byte[] bytes = new byte[byteBuf.readableBytes()];
+ byteBuf.readBytes(bytes);
+ try {
+ ioOutputStream.writeBuffer(new ByteArrayBuffer(bytes))
+ .addListener(future -> {
+ if (future.isWritten()) {
+ byteBuf.release(); // report outbound message being handled
+ promise.setSuccess();
+ } else if (future.getException() != null) {
+ LOG.error("Error writing buffer", future.getException());
+ promise.setFailure(future.getException());
+ }
+ });
+ } catch (IOException e) {
+ LOG.error("Error writing buffer", e);
+ }
+ }
+ }
+ });
+
// inner channel termination handler
- embeddedChannel.pipeline().addFirst(
+ innerChannel.pipeline().addLast(
new ChannelInboundHandlerAdapter() {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
}
);
- // NETCONF handlers
- channelInitializer.initialize(embeddedChannel, new DefaultPromise<>(GlobalEventExecutor.INSTANCE));
+ // NETCONF protocol handlers
+ channelInitializer.initialize(innerChannel, new DefaultPromise<>(GlobalEventExecutor.INSTANCE));
// trigger negotiation flow
- embeddedChannel.pipeline().fireChannelActive();
- // set additional info for netconf session
- embeddedChannel.writeInbound(Unpooled.wrappedBuffer(getHelloAdditionalMessageBytes()));
+ innerChannel.pipeline().fireChannelActive();
+ // set additional info for upcoming netconf session
+ innerChannel.writeInbound(Unpooled.wrappedBuffer(getHelloAdditionalMessageBytes()));
}
@Override