X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=netconf%2Fnetconf-netty-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Fnettyutil%2Fhandler%2FNetconfEOMAggregator.java;h=6b32c5883ee770cc65c14baa0834163d6fbdc3bc;hb=5c92f1730eafb59a22c6ba160eb8e87b9aee4d4a;hp=9c441404d82373d69b41ee302a6ea38cb36d2025;hpb=47c1b8e3d9835d336c79d6b4ca4e61417a05039e;p=netconf.git diff --git a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/NetconfEOMAggregator.java b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/NetconfEOMAggregator.java index 9c441404d8..6b32c5883e 100644 --- a/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/NetconfEOMAggregator.java +++ b/netconf/netconf-netty-util/src/main/java/org/opendaylight/netconf/nettyutil/handler/NetconfEOMAggregator.java @@ -5,18 +5,85 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.netconf.nettyutil.handler; +import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.handler.codec.DelimiterBasedFrameDecoder; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class NetconfEOMAggregator extends ByteToMessageDecoder { + private static final Logger LOG = LoggerFactory.getLogger(NetconfEOMAggregator.class); + + // Cached for brevity and constantness + private static final byte[] EOM = MessageParts.END_OF_MESSAGE; + private static final int EOM_LENGTH = EOM.length; + + // Number of input ByteBuf bytes known to not include the delimiter. + private int bodyLength = 0; + + @Override + protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List out) { + ByteBuf frame; + while ((frame = decodeFrame(ctx, in)) != null) { + out.add(frame); + } + } -public class NetconfEOMAggregator extends DelimiterBasedFrameDecoder { + @VisibleForTesting + int bodyLength() { + return bodyLength; + } + + private ByteBuf decodeFrame(final ChannelHandlerContext ctx, final ByteBuf in) { + // Cache the details of input ByteBuf as they are invariants + final int readerIndex = in.readerIndex(); + final int writerIndex = in.writerIndex(); + + int searchIndex = readerIndex + bodyLength; + while (true) { + // Try to find the first EOM byte + final int eomIndex = in.indexOf(searchIndex, writerIndex, EOM[0]); + if (eomIndex == -1) { + // The first byte (']') is not present, everything in the buffer is part of the body + bodyLength = writerIndex - readerIndex; + return null; + } - public static final ByteBuf DELIMITER = Unpooled.wrappedBuffer(MessageParts.END_OF_MESSAGE); + // a.k.a. in.readableBytes() from the first EOM byte + final int readableBytes = writerIndex - eomIndex; + if (readableBytes < EOM_LENGTH) { + // Not enough bytes to contain a delimiter, bail out + LOG.trace("Context {} buffer {} has only {} new bytes", ctx, in, readableBytes); + bodyLength = eomIndex - readerIndex; + return null; + } + + // Check for EOM match + if (isEom(in, eomIndex)) { + final int frameLength = eomIndex - readerIndex; + LOG.debug("Context {} buffer {} frame detected: length {}", ctx, in, frameLength); + final var ret = in.readRetainedSlice(frameLength); + in.skipBytes(EOM_LENGTH); + bodyLength = 0; + return ret; + } + + // No match: move one byte past eomIndex and repeat + searchIndex = eomIndex + 1; + LOG.trace("Context {} buffer {} restart at {}", ctx, in, searchIndex); + } + } - public NetconfEOMAggregator() { - super(Integer.MAX_VALUE, DELIMITER); + private static boolean isEom(final ByteBuf in, final int index) { + for (int i = 1; i < EOM_LENGTH; ++i) { + if (in.getByte(index + i) != EOM[i]) { + return false; + } + } + return true; } }