Get rid of Netty's DelimiterBasedFrameDecoder
[netconf.git] / netconf / netconf-netty-util / src / main / java / org / opendaylight / netconf / nettyutil / handler / NetconfEOMAggregator.java
index 9c441404d82373d69b41ee302a6ea38cb36d2025..6b32c5883ee770cc65c14baa0834163d6fbdc3bc 100644 (file)
@@ -5,18 +5,85 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.netconf.nettyutil.handler;
 
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.handler.codec.DelimiterBasedFrameDecoder;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class NetconfEOMAggregator extends ByteToMessageDecoder {
+    private static final Logger LOG = LoggerFactory.getLogger(NetconfEOMAggregator.class);
+
+    // Cached for brevity and constantness
+    private static final byte[] EOM = MessageParts.END_OF_MESSAGE;
+    private static final int EOM_LENGTH = EOM.length;
+
+    // Number of input ByteBuf bytes known to not include the delimiter.
+    private int bodyLength = 0;
+
+    @Override
+    protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) {
+        ByteBuf frame;
+        while ((frame = decodeFrame(ctx, in)) != null) {
+            out.add(frame);
+        }
+    }
 
-public class NetconfEOMAggregator extends DelimiterBasedFrameDecoder {
+    @VisibleForTesting
+    int bodyLength() {
+        return bodyLength;
+    }
+
+    private ByteBuf decodeFrame(final ChannelHandlerContext ctx, final ByteBuf in) {
+        // Cache the details of input ByteBuf as they are invariants
+        final int readerIndex = in.readerIndex();
+        final int writerIndex = in.writerIndex();
+
+        int searchIndex = readerIndex + bodyLength;
+        while (true) {
+            // Try to find the first EOM byte
+            final int eomIndex = in.indexOf(searchIndex, writerIndex, EOM[0]);
+            if (eomIndex == -1) {
+                // The first byte (']') is not present, everything in the buffer is part of the body
+                bodyLength = writerIndex - readerIndex;
+                return null;
+            }
 
-    public static final ByteBuf DELIMITER = Unpooled.wrappedBuffer(MessageParts.END_OF_MESSAGE);
+            // a.k.a. in.readableBytes() from the first EOM byte
+            final int readableBytes = writerIndex - eomIndex;
+            if (readableBytes < EOM_LENGTH) {
+                // Not enough bytes to contain a delimiter, bail out
+                LOG.trace("Context {} buffer {} has only {} new bytes", ctx, in, readableBytes);
+                bodyLength = eomIndex - readerIndex;
+                return null;
+            }
+
+            // Check for EOM match
+            if (isEom(in, eomIndex)) {
+                final int frameLength = eomIndex - readerIndex;
+                LOG.debug("Context {} buffer {} frame detected: length {}", ctx, in, frameLength);
+                final var ret = in.readRetainedSlice(frameLength);
+                in.skipBytes(EOM_LENGTH);
+                bodyLength = 0;
+                return ret;
+            }
+
+            // No match: move one byte past eomIndex and repeat
+            searchIndex = eomIndex + 1;
+            LOG.trace("Context {} buffer {} restart at {}", ctx, in, searchIndex);
+        }
+    }
 
-    public NetconfEOMAggregator() {
-        super(Integer.MAX_VALUE, DELIMITER);
+    private static boolean isEom(final ByteBuf in, final int index) {
+        for (int i = 1; i < EOM_LENGTH; ++i) {
+            if (in.getByte(index + i) != EOM[i]) {
+                return false;
+            }
+        }
+        return true;
     }
 }