Fix frame encoder for netconf server
[netconf.git] / protocol / netconf-server / src / main / java / org / opendaylight / netconf / server / NetconfSubsystemFactory.java
index 1e6ac1f4366fd61b35d67628fcb3ee2ef125a3cc..789a19d542cc46bec621995dc6117ebc003e470c 100644 (file)
@@ -11,9 +11,10 @@ import static java.util.Objects.requireNonNull;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
 import io.netty.channel.embedded.EmbeddedChannel;
 import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.GlobalEventExecutor;
@@ -57,7 +58,7 @@ public final class NetconfSubsystemFactory implements SubsystemFactory {
         private static final Logger LOG = LoggerFactory.getLogger(NetconfSubsystem.class);
 
         private final ServerChannelInitializer channelInitializer;
-        private Channel innerChannel;
+        private EmbeddedChannel innerChannel;
         private IoOutputStream ioOutputStream;
         private ChannelSession channelSession;
 
@@ -94,50 +95,51 @@ public final class NetconfSubsystemFactory implements SubsystemFactory {
              * the inner channel is used to serve NETCONF over SSH.
              */
 
-            final var embeddedChannel = new EmbeddedChannel() {
-                @Override
-                protected void handleOutboundMessage(final Object msg) {
-                    if (msg instanceof ByteBuf byteBuf) {
-                        // redirect channel outgoing packets to output stream linked to transport
-                        final byte[] bytes = new byte[byteBuf.readableBytes()];
-                        byteBuf.readBytes(bytes);
-                        try {
-                            ioOutputStream.writeBuffer(new ByteArrayBuffer(bytes))
-                                .addListener(future -> {
-                                    if (future.isWritten()) {
-                                        byteBuf.release(); // report outbound message being handled
-                                    } else if (future.getException() != null) {
-                                        LOG.debug("Error writing buffer", future.getException());
-                                    }
-                                });
-                        } catch (IOException e) {
-                            LOG.error("Error writing buffer", e);
-                        }
-                    } else {
-                        // non-ByteBuf messages are persisted within channel for subsequent handling
-                        super.handleOutboundMessage(msg);
-                    }
-                }
-            };
-
-            this.innerChannel = embeddedChannel;
+            this.innerChannel = new EmbeddedChannel();
 
             // inbound packets handler
             channelSession.setDataReceiver(new ChannelDataReceiver() {
                 @Override
                 public int data(ChannelSession channel, byte[] buf, int start, int len) throws IOException {
-                    embeddedChannel.writeInbound(Unpooled.copiedBuffer(buf, start, len));
+                    innerChannel.writeInbound(Unpooled.copiedBuffer(buf, start, len));
                     return len;
                 }
 
                 @Override
                 public void close() throws IOException {
-                    embeddedChannel.close();
+                    innerChannel.close();
                 }
             });
 
+            // outbound packet handler, adding fist means it will be invoked last bc of flow direction
+            innerChannel.pipeline().addFirst(
+                new ChannelOutboundHandlerAdapter() {
+                    @Override
+                    public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
+                        if (msg instanceof ByteBuf byteBuf) {
+                            // redirect channel outgoing packets to output stream linked to transport
+                            final byte[] bytes = new byte[byteBuf.readableBytes()];
+                            byteBuf.readBytes(bytes);
+                            try {
+                                ioOutputStream.writeBuffer(new ByteArrayBuffer(bytes))
+                                    .addListener(future -> {
+                                        if (future.isWritten()) {
+                                            byteBuf.release(); // report outbound message being handled
+                                            promise.setSuccess();
+                                        } else if (future.getException() != null) {
+                                            LOG.error("Error writing buffer", future.getException());
+                                            promise.setFailure(future.getException());
+                                        }
+                                    });
+                            } catch (IOException e) {
+                                LOG.error("Error writing buffer", e);
+                            }
+                        }
+                    }
+                });
+
             // inner channel termination handler
-            embeddedChannel.pipeline().addFirst(
+            innerChannel.pipeline().addLast(
                 new ChannelInboundHandlerAdapter() {
                     @Override
                     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
@@ -146,12 +148,12 @@ public final class NetconfSubsystemFactory implements SubsystemFactory {
                 }
             );
 
-            // NETCONF handlers
-            channelInitializer.initialize(embeddedChannel, new DefaultPromise<>(GlobalEventExecutor.INSTANCE));
+            // NETCONF protocol handlers
+            channelInitializer.initialize(innerChannel, new DefaultPromise<>(GlobalEventExecutor.INSTANCE));
             // trigger negotiation flow
-            embeddedChannel.pipeline().fireChannelActive();
-            // set additional info for netconf session
-            embeddedChannel.writeInbound(Unpooled.wrappedBuffer(getHelloAdditionalMessageBytes()));
+            innerChannel.pipeline().fireChannelActive();
+            // set additional info for upcoming netconf session
+            innerChannel.writeInbound(Unpooled.wrappedBuffer(getHelloAdditionalMessageBytes()));
         }
 
         @Override