Fix message aggregator for chunked netconf messages.
[controller.git] / opendaylight / netconf / netconf-util / src / main / java / org / opendaylight / controller / netconf / util / handler / NetconfChunkAggregator.java
index f7045c3b301cb343722a1ab00d0554289e45fb5a..219e92c3f14053c805d1ac204132f92d62479948 100644 (file)
@@ -8,15 +8,17 @@
 
 package org.opendaylight.controller.netconf.util.handler;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.ByteToMessageDecoder;
-
 import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+
 public class NetconfChunkAggregator extends ByteToMessageDecoder {
     private final static Logger logger = LoggerFactory.getLogger(NetconfChunkAggregator.class);
     public static final int DEFAULT_MAXIMUM_CHUNK_SIZE = 16 * 1024 * 1024;
@@ -36,7 +38,7 @@ public class NetconfChunkAggregator extends ByteToMessageDecoder {
     private final int maxChunkSize = DEFAULT_MAXIMUM_CHUNK_SIZE;
     private State state = State.HEADER_ONE;
     private long chunkSize;
-    private ByteBuf chunk;
+    private CompositeByteBuf chunk;
 
     @Override
     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
@@ -51,6 +53,8 @@ public class NetconfChunkAggregator extends ByteToMessageDecoder {
                 }
 
                 state = State.HEADER_TWO;
+
+                initChunk();
                 break;
             }
             case HEADER_TWO:
@@ -67,12 +71,7 @@ public class NetconfChunkAggregator extends ByteToMessageDecoder {
             case HEADER_LENGTH_FIRST:
             {
                 final byte b = in.readByte();
-                if (b < '1' || b > '9') {
-                    logger.debug("Got byte {} while waiting for {}-{}", b, (byte)'1', (byte)'9');
-                    throw new IllegalStateException("Invalid chunk size encountered (byte 0)");
-                }
-
-                chunkSize = b - '0';
+                chunkSize = processHeaderLengthFirst(b);
                 state = State.HEADER_LENGTH_OTHER;
                 break;
             }
@@ -111,7 +110,7 @@ public class NetconfChunkAggregator extends ByteToMessageDecoder {
                     return;
                 }
 
-                chunk = in.readBytes((int)chunkSize);
+                aggregateChunks(in.readBytes((int) chunkSize));
                 state = State.FOOTER_ONE;
                 break;
             case FOOTER_ONE:
@@ -123,11 +122,13 @@ public class NetconfChunkAggregator extends ByteToMessageDecoder {
                 }
 
                 state = State.FOOTER_TWO;
+                chunkSize = 0;
                 break;
             }
             case FOOTER_TWO:
             {
                 final byte b = in.readByte();
+
                 if (b != '#') {
                     logger.debug("Got byte {} while waiting for {}", b, (byte)'#');
                     throw new IllegalStateException("Malformed chunk footer encountered (byte 1)");
@@ -139,12 +140,22 @@ public class NetconfChunkAggregator extends ByteToMessageDecoder {
             case FOOTER_THREE:
             {
                 final byte b = in.readByte();
-                if (b != '#') {
-                    logger.debug("Got byte {} while waiting for {}", b, (byte)'#');
+
+                // In this state, either header-of-new-chunk or message-end is expected
+                // Depends on the next character
+
+                if (isHeaderLengthFirst(b)) {
+                    // Extract header length#1 from new chunk
+                    chunkSize = processHeaderLengthFirst(b);
+                    // Proceed with next chunk processing
+                    state = State.HEADER_LENGTH_OTHER;
+                } else if (b == '#') {
+                    state = State.FOOTER_FOUR;
+                } else {
+                    logger.debug("Got byte {} while waiting for {} or {}-{}", b, (byte) '#', (byte) '1', (byte) '9');
                     throw new IllegalStateException("Malformed chunk footer encountered (byte 2)");
                 }
 
-                state = State.FOOTER_FOUR;
                 break;
             }
             case FOOTER_FOUR:
@@ -157,7 +168,6 @@ public class NetconfChunkAggregator extends ByteToMessageDecoder {
 
                 state = State.HEADER_ONE;
                 out.add(chunk);
-                chunkSize = 0;
                 chunk = null;
                 break;
             }
@@ -166,4 +176,28 @@ public class NetconfChunkAggregator extends ByteToMessageDecoder {
 
         in.discardReadBytes();
     }
+
+    private void initChunk() {
+        chunk = Unpooled.compositeBuffer();
+    }
+
+    private void aggregateChunks(ByteBuf newChunk) {
+        chunk.addComponent(chunk.numComponents(), newChunk);
+
+        // Update writer index, addComponent does not update it
+        chunk.writerIndex(chunk.writerIndex() + newChunk.readableBytes());
+    }
+
+    private static int processHeaderLengthFirst(byte b) {
+        if (isHeaderLengthFirst(b) == false) {
+            logger.debug("Got byte {} while waiting for {}-{}", b, (byte)'1', (byte)'9');
+            throw new IllegalStateException("Invalid chunk size encountered (byte 0)");
+        }
+
+        return b - '0';
+    }
+
+    private static boolean isHeaderLengthFirst(byte b) {
+        return b >= '1' && b <= '9';
+    }
 }