Get rid of Netty's DelimiterBasedFrameDecoder 90/75490/15
authorJakub Morvay <jmorvay@frinx.io>
Thu, 16 Aug 2018 12:02:02 +0000 (14:02 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Fri, 29 Jul 2022 09:03:21 +0000 (11:03 +0200)
Netty's DelimiterBasedFrameDecoder is not really effective when dealing
with huge NETCONF messages and reading them in just really small parts
at a time.

DelimiterBasedFrameDecoder always searches whole input ByteBuf for
delimiter when new input bytes are available. It performs the search
also on bytes already checked in previous tries. Obviously, this can be
really ineffective when reading large messages by very small parts at
a time.

Replace Netty's DelimiterBasedFrameDecoder by our own frame detection
implementation. The implementation remembers already checked part of
the input ByteBuf and searches the delimiter just in a new bytes
available.

JIRA: NETCONF-889
Change-Id: If5e78f4373d767f4cf465024313eeac873c1621d
Signed-off-by: Jakub Morvay <jmorvay@frinx.io>
Signed-off-by: Maros Marsalek <mmarsalek@frinx.io>
Signed-off-by: Sangwook Ha <sangwook.ha@verizon.com>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/NetconfEOMAggregator.java
netconf/netconf-netty-util/src/test/java/org/opendaylight/netconf/nettyutil/handler/NetconfEOMAggregatorTest.java [new file with mode: 0644]

index 9c441404d82373d69b41ee302a6ea38cb36d2025..6b32c5883ee770cc65c14baa0834163d6fbdc3bc 100644 (file)
@@ -5,18 +5,85 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.netconf.nettyutil.handler;
 
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.handler.codec.DelimiterBasedFrameDecoder;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class NetconfEOMAggregator extends ByteToMessageDecoder {
+    private static final Logger LOG = LoggerFactory.getLogger(NetconfEOMAggregator.class);
+
+    // Cached for brevity and constantness
+    private static final byte[] EOM = MessageParts.END_OF_MESSAGE;
+    private static final int EOM_LENGTH = EOM.length;
+
+    // Number of input ByteBuf bytes known to not include the delimiter.
+    private int bodyLength = 0;
+
+    @Override
+    protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) {
+        ByteBuf frame;
+        while ((frame = decodeFrame(ctx, in)) != null) {
+            out.add(frame);
+        }
+    }
 
-public class NetconfEOMAggregator extends DelimiterBasedFrameDecoder {
+    @VisibleForTesting
+    int bodyLength() {
+        return bodyLength;
+    }
+
+    private ByteBuf decodeFrame(final ChannelHandlerContext ctx, final ByteBuf in) {
+        // Cache the details of input ByteBuf as they are invariants
+        final int readerIndex = in.readerIndex();
+        final int writerIndex = in.writerIndex();
+
+        int searchIndex = readerIndex + bodyLength;
+        while (true) {
+            // Try to find the first EOM byte
+            final int eomIndex = in.indexOf(searchIndex, writerIndex, EOM[0]);
+            if (eomIndex == -1) {
+                // The first byte (']') is not present, everything in the buffer is part of the body
+                bodyLength = writerIndex - readerIndex;
+                return null;
+            }
 
-    public static final ByteBuf DELIMITER = Unpooled.wrappedBuffer(MessageParts.END_OF_MESSAGE);
+            // a.k.a. in.readableBytes() from the first EOM byte
+            final int readableBytes = writerIndex - eomIndex;
+            if (readableBytes < EOM_LENGTH) {
+                // Not enough bytes to contain a delimiter, bail out
+                LOG.trace("Context {} buffer {} has only {} new bytes", ctx, in, readableBytes);
+                bodyLength = eomIndex - readerIndex;
+                return null;
+            }
+
+            // Check for EOM match
+            if (isEom(in, eomIndex)) {
+                final int frameLength = eomIndex - readerIndex;
+                LOG.debug("Context {} buffer {} frame detected: length {}", ctx, in, frameLength);
+                final var ret = in.readRetainedSlice(frameLength);
+                in.skipBytes(EOM_LENGTH);
+                bodyLength = 0;
+                return ret;
+            }
+
+            // No match: move one byte past eomIndex and repeat
+            searchIndex = eomIndex + 1;
+            LOG.trace("Context {} buffer {} restart at {}", ctx, in, searchIndex);
+        }
+    }
 
-    public NetconfEOMAggregator() {
-        super(Integer.MAX_VALUE, DELIMITER);
+    private static boolean isEom(final ByteBuf in, final int index) {
+        for (int i = 1; i < EOM_LENGTH; ++i) {
+            if (in.getByte(index + i) != EOM[i]) {
+                return false;
+            }
+        }
+        return true;
     }
 }
diff --git a/netconf/netconf-netty-util/src/test/java/org/opendaylight/netconf/nettyutil/handler/NetconfEOMAggregatorTest.java b/netconf/netconf-netty-util/src/test/java/org/opendaylight/netconf/nettyutil/handler/NetconfEOMAggregatorTest.java
new file mode 100644 (file)
index 0000000..e2d5f8d
--- /dev/null
@@ -0,0 +1,230 @@
+/*
+ * Copyright (c) 2018 FRINX s.r.o., and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.nettyutil.handler;
+
+import static org.junit.Assert.assertEquals;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.nio.charset.Charset;
+import java.util.LinkedList;
+import java.util.List;
+import org.junit.Before;
+import org.junit.Test;
+
+public class NetconfEOMAggregatorTest {
+
+    private static final String COMM_1 = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
+            + "<rpc-reply message-id=\"105\"\n"
+            + "xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n"
+            + "<config xmlns=\"http://example.com/schema/1.2/config\">\n"
+            + "<users>\n"
+            + "<user><name>root</name><type>superuser</type></user>\n"
+            + "<user><name>fred</name><type>admin</type></user>\n"
+            + "<user><name>barney</name><type>admin</type></user>\n"
+            + "</users>\n"
+            + "</config>\n"
+            + "</rpc-reply>\n"
+            + "]]>]]>"
+            + "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
+            + "<rpc-reply message-id=\"106\"\n"
+            + "xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n"
+            + "<config xmlns=\"http://example.com/schema/1.2/config\">\n"
+            + "<users>\n"
+            + "<user><name>root</name><type>superuser</type></user>\n"
+            + "<user><name>fred</name><type>admin</type></user>\n"
+            + "<user><name>barney</name><type>admin</type></user>\n"
+            + "<user><name>joe</name><type>user</type></user>\n"
+            + "</users>\n"
+            + "</config>\n"
+            + "</rpc-reply>\n"
+            + "]]>]]>";
+
+    private static final String COMM_1_M_1 = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
+            + "<rpc-reply message-id=\"105\"\n"
+            + "xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n"
+            + "<config xmlns=\"http://example.com/schema/1.2/config\">\n"
+            + "<users>\n"
+            + "<user><name>root</name><type>superuser</type></user>\n"
+            + "<user><name>fred</name><type>admin</type></user>\n"
+            + "<user><name>barney</name><type>admin</type></user>\n"
+            + "</users>\n"
+            + "</config>\n"
+            + "</rpc-reply>\n";
+    private static final String COMM_1_M_2 = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
+            + "<rpc-reply message-id=\"106\"\n"
+            + "xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n"
+            + "<config xmlns=\"http://example.com/schema/1.2/config\">\n"
+            + "<users>\n"
+            + "<user><name>root</name><type>superuser</type></user>\n"
+            + "<user><name>fred</name><type>admin</type></user>\n"
+            + "<user><name>barney</name><type>admin</type></user>\n"
+            + "<user><name>joe</name><type>user</type></user>\n"
+            + "</users>\n"
+            + "</config>\n"
+            + "</rpc-reply>\n";
+
+    private static final String COMM_2 = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
+            + "<rpc-reply message-id=\"107\"\n"
+            + "xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n"
+            + "<config xmlns=\"http://example.com/schema/1.2/config\">\n"
+            + "<cars>\n"
+            + "<car><name>porsche</name></car>\n"
+            + "<car><name>ford</name></car>\n"
+            + "</cars>\n"
+            + "</config>\n"
+            + "</rpc-reply>\n"
+            + "]]>]]>";
+
+    private static final String COMM_2_M_1 = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
+            + "<rpc-reply message-id=\"107\"\n"
+            + "xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n"
+            + "<config xmlns=\"http://example.com/schema/1.2/config\">\n"
+            + "<cars>\n"
+            + "<car><name>porsche</name></car>\n"
+            + "<car><name>ford</name></car>\n"
+            + "</cars>\n"
+            + "</config>\n"
+            + "</rpc-reply>\n";
+
+    private static final String COMM_3_S_1 = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
+            + "<rpc-reply message-id=\"105\"\n"
+            + "xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n"
+            + "<config xmlns=\"http://example.com/schema/1.2/config\">\n"
+            + "<users>\n"
+            + "<user><name>root</name><type>superuser</type></user>\n"
+            + "<user><name>fred</name><type>admin</type></user>\n"
+            + "<user><name>barney</name><type>admin</type></user>\n"
+            + "</users>\n"
+            + "</config>\n"
+            + "</rpc-reply>\n"
+            + "]]>]]>";
+    private static final String COMM_3_S_2 = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
+            + "<rpc-reply message-id=\"107\"\n"
+            + "xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n"
+            + "<config xmlns=\"http://example.com/schema/1.2/config\">\n"
+            + "<cars>\n";
+    private static final String COMM_3_S_3 = "<car><name>porsche</name></car>\n"
+            + "<car><name>ford</name></car>\n"
+            + "</cars>\n"
+            + "</config>\n"
+            + "</rpc-reply>\n"
+            + "]]>]]>";
+
+    private static final String COMM_3_M_1 = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
+            + "<rpc-reply message-id=\"105\"\n"
+            + "xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n"
+            + "<config xmlns=\"http://example.com/schema/1.2/config\">\n"
+            + "<users>\n"
+            + "<user><name>root</name><type>superuser</type></user>\n"
+            + "<user><name>fred</name><type>admin</type></user>\n"
+            + "<user><name>barney</name><type>admin</type></user>\n"
+            + "</users>\n"
+            + "</config>\n"
+            + "</rpc-reply>\n";
+    private static final String COMM_3_M_2 = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
+            + "<rpc-reply message-id=\"107\"\n"
+            + "xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n"
+            + "<config xmlns=\"http://example.com/schema/1.2/config\">\n"
+            + "<cars>\n"
+            + "<car><name>porsche</name></car>\n"
+            + "<car><name>ford</name></car>\n"
+            + "</cars>\n"
+            + "</config>\n"
+            + "</rpc-reply>\n";
+
+    private static NetconfEOMAggregator aggregator;
+
+    @Before
+    public void setUp() throws Exception {
+        aggregator = new NetconfEOMAggregator();
+    }
+
+    @Test
+    public void testDecodeMessagesReadAtOnce() {
+        final ByteBuf in = Unpooled.copiedBuffer(COMM_1.getBytes());
+        final List<Object> out = new LinkedList<>();
+
+        aggregator.decode(null, in, out);
+        assertEquals(2, out.size());
+        assertEquals(COMM_1_M_1, byteBufToString((ByteBuf) out.get(0)));
+        assertEquals(COMM_1_M_2, byteBufToString((ByteBuf) out.get(1)));
+    }
+
+    @Test
+    public void testDecodeMessagesReadByteByByte() {
+        final ByteBuf in = Unpooled.buffer();
+        final List<Object> out = new LinkedList<>();
+
+        for (final byte b : COMM_1.getBytes()) {
+            in.writeByte(b);
+            aggregator.decode(null, in, out);
+        }
+
+        assertEquals(2, out.size());
+        assertEquals(COMM_1_M_1, byteBufToString((ByteBuf) out.get(0)));
+        assertEquals(COMM_1_M_2, byteBufToString((ByteBuf) out.get(1)));
+    }
+
+    @Test
+    public void testDecodeMultipleStreams() {
+        final ByteBuf in = Unpooled.copiedBuffer(COMM_1.getBytes());
+        final List<Object> out = new LinkedList<>();
+
+        aggregator.decode(null, in, out);
+        assertEquals(2, out.size());
+        assertEquals(COMM_1_M_1, byteBufToString((ByteBuf) out.get(0)));
+        assertEquals(COMM_1_M_2, byteBufToString((ByteBuf) out.get(1)));
+
+        final ByteBuf in2 = Unpooled.copiedBuffer(COMM_2.getBytes());
+        aggregator.decode(null, in2, out);
+        assertEquals(3, out.size());
+        assertEquals(COMM_2_M_1, byteBufToString((ByteBuf) out.get(2)));
+    }
+
+    @Test
+    public void testDecodeBufferReset() {
+        final ByteBuf in = Unpooled.buffer();
+        final List<Object> out = new LinkedList<>();
+
+        in.writeBytes((COMM_3_S_1 + COMM_3_S_2).getBytes());
+
+        aggregator.decode(null, in, out);
+        assertEquals(1, out.size());
+        assertEquals(COMM_3_M_1, byteBufToString((ByteBuf) out.get(0)));
+
+        aggregator.decode(null, in, out);
+        assertEquals(1, out.size());
+
+        in.clear();
+        in.writeBytes((COMM_3_S_2 + COMM_3_S_3).getBytes());
+
+        aggregator.decode(null, in, out);
+        assertEquals(2, out.size());
+        assertEquals(COMM_3_M_2, byteBufToString((ByteBuf) out.get(1)));
+    }
+
+    @Test
+    public void testDecodeEmptyMessage() {
+        final ByteBuf in = Unpooled.buffer();
+        final List<Object> out = new LinkedList<>();
+
+        for (final byte b : MessageParts.END_OF_MESSAGE) {
+            in.writeByte(b);
+            aggregator.decode(null, in, out);
+            assertEquals(0, aggregator.bodyLength());
+        }
+
+        assertEquals(1, out.size());
+        assertEquals("", byteBufToString((ByteBuf) out.get(0)));
+    }
+
+    private static String byteBufToString(final ByteBuf byteBuf) {
+        return byteBuf.toString(Charset.defaultCharset());
+    }
+}