From: Jakub Morvay Date: Thu, 16 Aug 2018 12:02:02 +0000 (+0200) Subject: Get rid of Netty's DelimiterBasedFrameDecoder X-Git-Tag: v4.0.0~3 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=netconf.git;a=commitdiff_plain;h=5c92f1730eafb59a22c6ba160eb8e87b9aee4d4a Get rid of Netty's DelimiterBasedFrameDecoder Netty's DelimiterBasedFrameDecoder is not really effective when dealing with huge NETCONF messages and reading them in just really small parts at a time. DelimiterBasedFrameDecoder always searches whole input ByteBuf for delimiter when new input bytes are available. It performs the search also on bytes already checked in previous tries. Obviously, this can be really ineffective when reading large messages by very small parts at a time. Replace Netty's DelimiterBasedFrameDecoder by our own frame detection implementation. The implementation remembers already checked part of the input ByteBuf and searches the delimiter just in a new bytes available. JIRA: NETCONF-889 Change-Id: If5e78f4373d767f4cf465024313eeac873c1621d Signed-off-by: Jakub Morvay Signed-off-by: Maros Marsalek Signed-off-by: Sangwook Ha Signed-off-by: Robert Varga --- diff --git a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/NetconfEOMAggregator.java b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/NetconfEOMAggregator.java index 9c441404d8..6b32c5883e 100644 --- a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/NetconfEOMAggregator.java +++ b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/NetconfEOMAggregator.java @@ -5,18 +5,85 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.netconf.nettyutil.handler; +import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.handler.codec.DelimiterBasedFrameDecoder; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class NetconfEOMAggregator extends ByteToMessageDecoder { + private static final Logger LOG = LoggerFactory.getLogger(NetconfEOMAggregator.class); + + // Cached for brevity and constantness + private static final byte[] EOM = MessageParts.END_OF_MESSAGE; + private static final int EOM_LENGTH = EOM.length; + + // Number of input ByteBuf bytes known to not include the delimiter. + private int bodyLength = 0; + + @Override + protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List out) { + ByteBuf frame; + while ((frame = decodeFrame(ctx, in)) != null) { + out.add(frame); + } + } -public class NetconfEOMAggregator extends DelimiterBasedFrameDecoder { + @VisibleForTesting + int bodyLength() { + return bodyLength; + } + + private ByteBuf decodeFrame(final ChannelHandlerContext ctx, final ByteBuf in) { + // Cache the details of input ByteBuf as they are invariants + final int readerIndex = in.readerIndex(); + final int writerIndex = in.writerIndex(); + + int searchIndex = readerIndex + bodyLength; + while (true) { + // Try to find the first EOM byte + final int eomIndex = in.indexOf(searchIndex, writerIndex, EOM[0]); + if (eomIndex == -1) { + // The first byte (']') is not present, everything in the buffer is part of the body + bodyLength = writerIndex - readerIndex; + return null; + } - public static final ByteBuf DELIMITER = Unpooled.wrappedBuffer(MessageParts.END_OF_MESSAGE); + // a.k.a. in.readableBytes() from the first EOM byte + final int readableBytes = writerIndex - eomIndex; + if (readableBytes < EOM_LENGTH) { + // Not enough bytes to contain a delimiter, bail out + LOG.trace("Context {} buffer {} has only {} new bytes", ctx, in, readableBytes); + bodyLength = eomIndex - readerIndex; + return null; + } + + // Check for EOM match + if (isEom(in, eomIndex)) { + final int frameLength = eomIndex - readerIndex; + LOG.debug("Context {} buffer {} frame detected: length {}", ctx, in, frameLength); + final var ret = in.readRetainedSlice(frameLength); + in.skipBytes(EOM_LENGTH); + bodyLength = 0; + return ret; + } + + // No match: move one byte past eomIndex and repeat + searchIndex = eomIndex + 1; + LOG.trace("Context {} buffer {} restart at {}", ctx, in, searchIndex); + } + } - public NetconfEOMAggregator() { - super(Integer.MAX_VALUE, DELIMITER); + private static boolean isEom(final ByteBuf in, final int index) { + for (int i = 1; i < EOM_LENGTH; ++i) { + if (in.getByte(index + i) != EOM[i]) { + return false; + } + } + return true; } } diff --git a/netconf/netconf-netty-util/src/test/java/org/opendaylight/netconf/nettyutil/handler/NetconfEOMAggregatorTest.java b/netconf/netconf-netty-util/src/test/java/org/opendaylight/netconf/nettyutil/handler/NetconfEOMAggregatorTest.java new file mode 100644 index 0000000000..e2d5f8da3d --- /dev/null +++ b/netconf/netconf-netty-util/src/test/java/org/opendaylight/netconf/nettyutil/handler/NetconfEOMAggregatorTest.java @@ -0,0 +1,230 @@ +/* + * Copyright (c) 2018 FRINX s.r.o., and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.nettyutil.handler; + +import static org.junit.Assert.assertEquals; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.nio.charset.Charset; +import java.util.LinkedList; +import java.util.List; +import org.junit.Before; +import org.junit.Test; + +public class NetconfEOMAggregatorTest { + + private static final String COMM_1 = "\n" + + "\n" + + "\n" + + "\n" + + "rootsuperuser\n" + + "fredadmin\n" + + "barneyadmin\n" + + "\n" + + "\n" + + "\n" + + "]]>]]>" + + "\n" + + "\n" + + "\n" + + "\n" + + "rootsuperuser\n" + + "fredadmin\n" + + "barneyadmin\n" + + "joeuser\n" + + "\n" + + "\n" + + "\n" + + "]]>]]>"; + + private static final String COMM_1_M_1 = "\n" + + "\n" + + "\n" + + "\n" + + "rootsuperuser\n" + + "fredadmin\n" + + "barneyadmin\n" + + "\n" + + "\n" + + "\n"; + private static final String COMM_1_M_2 = "\n" + + "\n" + + "\n" + + "\n" + + "rootsuperuser\n" + + "fredadmin\n" + + "barneyadmin\n" + + "joeuser\n" + + "\n" + + "\n" + + "\n"; + + private static final String COMM_2 = "\n" + + "\n" + + "\n" + + "\n" + + "porsche\n" + + "ford\n" + + "\n" + + "\n" + + "\n" + + "]]>]]>"; + + private static final String COMM_2_M_1 = "\n" + + "\n" + + "\n" + + "\n" + + "porsche\n" + + "ford\n" + + "\n" + + "\n" + + "\n"; + + private static final String COMM_3_S_1 = "\n" + + "\n" + + "\n" + + "\n" + + "rootsuperuser\n" + + "fredadmin\n" + + "barneyadmin\n" + + "\n" + + "\n" + + "\n" + + "]]>]]>"; + private static final String COMM_3_S_2 = "\n" + + "\n" + + "\n" + + "\n"; + private static final String COMM_3_S_3 = "porsche\n" + + "ford\n" + + "\n" + + "\n" + + "\n" + + "]]>]]>"; + + private static final String COMM_3_M_1 = "\n" + + "\n" + + "\n" + + "\n" + + "rootsuperuser\n" + + "fredadmin\n" + + "barneyadmin\n" + + "\n" + + "\n" + + "\n"; + private static final String COMM_3_M_2 = "\n" + + "\n" + + "\n" + + "\n" + + "porsche\n" + + "ford\n" + + "\n" + + "\n" + + "\n"; + + private static NetconfEOMAggregator aggregator; + + @Before + public void setUp() throws Exception { + aggregator = new NetconfEOMAggregator(); + } + + @Test + public void testDecodeMessagesReadAtOnce() { + final ByteBuf in = Unpooled.copiedBuffer(COMM_1.getBytes()); + final List out = new LinkedList<>(); + + aggregator.decode(null, in, out); + assertEquals(2, out.size()); + assertEquals(COMM_1_M_1, byteBufToString((ByteBuf) out.get(0))); + assertEquals(COMM_1_M_2, byteBufToString((ByteBuf) out.get(1))); + } + + @Test + public void testDecodeMessagesReadByteByByte() { + final ByteBuf in = Unpooled.buffer(); + final List out = new LinkedList<>(); + + for (final byte b : COMM_1.getBytes()) { + in.writeByte(b); + aggregator.decode(null, in, out); + } + + assertEquals(2, out.size()); + assertEquals(COMM_1_M_1, byteBufToString((ByteBuf) out.get(0))); + assertEquals(COMM_1_M_2, byteBufToString((ByteBuf) out.get(1))); + } + + @Test + public void testDecodeMultipleStreams() { + final ByteBuf in = Unpooled.copiedBuffer(COMM_1.getBytes()); + final List out = new LinkedList<>(); + + aggregator.decode(null, in, out); + assertEquals(2, out.size()); + assertEquals(COMM_1_M_1, byteBufToString((ByteBuf) out.get(0))); + assertEquals(COMM_1_M_2, byteBufToString((ByteBuf) out.get(1))); + + final ByteBuf in2 = Unpooled.copiedBuffer(COMM_2.getBytes()); + aggregator.decode(null, in2, out); + assertEquals(3, out.size()); + assertEquals(COMM_2_M_1, byteBufToString((ByteBuf) out.get(2))); + } + + @Test + public void testDecodeBufferReset() { + final ByteBuf in = Unpooled.buffer(); + final List out = new LinkedList<>(); + + in.writeBytes((COMM_3_S_1 + COMM_3_S_2).getBytes()); + + aggregator.decode(null, in, out); + assertEquals(1, out.size()); + assertEquals(COMM_3_M_1, byteBufToString((ByteBuf) out.get(0))); + + aggregator.decode(null, in, out); + assertEquals(1, out.size()); + + in.clear(); + in.writeBytes((COMM_3_S_2 + COMM_3_S_3).getBytes()); + + aggregator.decode(null, in, out); + assertEquals(2, out.size()); + assertEquals(COMM_3_M_2, byteBufToString((ByteBuf) out.get(1))); + } + + @Test + public void testDecodeEmptyMessage() { + final ByteBuf in = Unpooled.buffer(); + final List out = new LinkedList<>(); + + for (final byte b : MessageParts.END_OF_MESSAGE) { + in.writeByte(b); + aggregator.decode(null, in, out); + assertEquals(0, aggregator.bodyLength()); + } + + assertEquals(1, out.size()); + assertEquals("", byteBufToString((ByteBuf) out.get(0))); + } + + private static String byteBufToString(final ByteBuf byteBuf) { + return byteBuf.toString(Charset.defaultCharset()); + } +}