Bug 460 - Fix warning throughout netconf subsystem
[controller.git] / opendaylight / netconf / netconf-util / src / main / java / org / opendaylight / controller / netconf / util / handler / NetconfChunkAggregator.java
index cee8e7133a91cf092b829b92f51f79fa99e947f6..9f9f4191f7641943fe768712b94c0ec2b3f409b7 100644 (file)
@@ -8,17 +8,21 @@
 
 package org.opendaylight.controller.netconf.util.handler;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.ByteToMessageDecoder;
-
 import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+
 public class NetconfChunkAggregator extends ByteToMessageDecoder {
     private final static Logger logger = LoggerFactory.getLogger(NetconfChunkAggregator.class);
+    private static final String GOT_PARAM_WHILE_WAITING_FOR_PARAM = "Got byte {} while waiting for {}";
+    private static final String GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM = "Got byte {} while waiting for {}-{}";
     public static final int DEFAULT_MAXIMUM_CHUNK_SIZE = 16 * 1024 * 1024;
 
     private static enum State {
@@ -36,28 +40,47 @@ public class NetconfChunkAggregator extends ByteToMessageDecoder {
     private final int maxChunkSize = DEFAULT_MAXIMUM_CHUNK_SIZE;
     private State state = State.HEADER_ONE;
     private long chunkSize;
-    private ByteBuf chunk;
+    private CompositeByteBuf chunk;
+
+    private void checkNewLine(byte b,String errorMessage){
+        if (b != '\n') {
+            logger.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM, b, (byte)'\n');
+            throw new IllegalStateException(errorMessage);
+        }
+    }
 
+    private void checkHash(byte b,String errorMessage){
+        if (b != '#') {
+            logger.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM, b, (byte)'#');
+            throw new IllegalStateException(errorMessage);
+        }
+    }
+
+    private void checkChunkSize(){
+        if (chunkSize > maxChunkSize) {
+            logger.debug("Parsed chunk size {}, maximum allowed is {}", chunkSize, maxChunkSize);
+            throw new IllegalStateException("Maximum chunk size exceeded");
+        }
+
+    }
     @Override
-    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws IllegalStateException {
         while (in.isReadable()) {
             switch (state) {
             case HEADER_ONE:
             {
                 final byte b = in.readByte();
-                if (b != '\n') {
-                    throw new IllegalStateException("Malformed chunk header encountered (byte 0)");
-                }
+                checkNewLine(b, "Malformed chunk header encountered (byte 0)");
 
                 state = State.HEADER_TWO;
+
+                initChunk();
                 break;
             }
             case HEADER_TWO:
             {
                 final byte b = in.readByte();
-                if (b != '#') {
-                    throw new IllegalStateException("Malformed chunk header encountered (byte 1)");
-                }
+                checkHash(b, "Malformed chunk header encountered (byte 1)");
 
                 state = State.HEADER_LENGTH_FIRST;
                 break;
@@ -65,11 +88,7 @@ public class NetconfChunkAggregator extends ByteToMessageDecoder {
             case HEADER_LENGTH_FIRST:
             {
                 final byte b = in.readByte();
-                if (b < '1' || b > '9') {
-                    throw new IllegalStateException("Invalid chunk size encountered (byte 0)");
-                }
-
-                chunkSize = b - '0';
+                chunkSize = processHeaderLengthFirst(b);
                 state = State.HEADER_LENGTH_OTHER;
                 break;
             }
@@ -82,15 +101,13 @@ public class NetconfChunkAggregator extends ByteToMessageDecoder {
                 }
 
                 if (b < '0' || b > '9') {
+                    logger.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM, b, (byte)'0', (byte)'9');
                     throw new IllegalStateException("Invalid chunk size encountered");
                 }
 
                 chunkSize *= 10;
                 chunkSize += b - '0';
-
-                if (chunkSize > maxChunkSize) {
-                    throw new IllegalStateException("Maximum chunk size exceeded");
-                }
+                checkChunkSize();
                 break;
             }
             case DATA:
@@ -105,54 +122,41 @@ public class NetconfChunkAggregator extends ByteToMessageDecoder {
                     in.discardReadBytes();
                     return;
                 }
-
-                chunk = in.readBytes((int)chunkSize);
+                aggregateChunks(in.readBytes((int) chunkSize));
                 state = State.FOOTER_ONE;
                 break;
             case FOOTER_ONE:
             {
                 final byte b = in.readByte();
-                if (b != '\n') {
-                    logger.debug("Got byte {} while waiting for {}", b, (byte)'\n');
-                    throw new IllegalStateException("Malformed chunk footer encountered (byte 0)");
-                }
-
+                checkNewLine(b,"Malformed chunk footer encountered (byte 0)");
                 state = State.FOOTER_TWO;
+                chunkSize = 0;
                 break;
             }
             case FOOTER_TWO:
             {
                 final byte b = in.readByte();
-                if (b != '#') {
-                    logger.debug("Got byte {} while waiting for {}", b, (byte)'#');
-                    throw new IllegalStateException("Malformed chunk footer encountered (byte 1)");
-                }
-
+                checkHash(b,"Malformed chunk footer encountered (byte 1)");
                 state = State.FOOTER_THREE;
                 break;
             }
             case FOOTER_THREE:
             {
                 final byte b = in.readByte();
-                if (b != '#') {
-                    logger.debug("Got byte {} while waiting for {}", b, (byte)'#');
-                    throw new IllegalStateException("Malformed chunk footer encountered (byte 2)");
-                }
 
-                state = State.FOOTER_FOUR;
+                // In this state, either header-of-new-chunk or message-end is expected
+                // Depends on the next character
+
+                extractNewChunkOrMessageEnd(b);
+
                 break;
             }
             case FOOTER_FOUR:
             {
                 final byte b = in.readByte();
-                if (b != '\n') {
-                    logger.debug("Got byte {} while waiting for {}", b, (byte)'\n');
-                    throw new IllegalStateException("Malformed chunk footer encountered (byte 3)");
-                }
-
+                checkNewLine(b,"Malformed chunk footer encountered (byte 3)");
                 state = State.HEADER_ONE;
                 out.add(chunk);
-                chunkSize = 0;
                 chunk = null;
                 break;
             }
@@ -161,4 +165,42 @@ public class NetconfChunkAggregator extends ByteToMessageDecoder {
 
         in.discardReadBytes();
     }
+
+    private void extractNewChunkOrMessageEnd(byte b) {
+        if (isHeaderLengthFirst(b)) {
+            // Extract header length#1 from new chunk
+            chunkSize = processHeaderLengthFirst(b);
+            // Proceed with next chunk processing
+            state = State.HEADER_LENGTH_OTHER;
+        } else if (b == '#') {
+            state = State.FOOTER_FOUR;
+        } else {
+            logger.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM, b, (byte) '#', (byte) '1', (byte) '9');
+            throw new IllegalStateException("Malformed chunk footer encountered (byte 2)");
+        }
+    }
+
+    private void initChunk() {
+        chunk = Unpooled.compositeBuffer();
+    }
+
+    private void aggregateChunks(ByteBuf newChunk) {
+        chunk.addComponent(chunk.numComponents(), newChunk);
+
+        // Update writer index, addComponent does not update it
+        chunk.writerIndex(chunk.writerIndex() + newChunk.readableBytes());
+    }
+
+    private static int processHeaderLengthFirst(byte b) {
+        if (!isHeaderLengthFirst(b)) {
+            logger.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM, b, (byte)'1', (byte)'9');
+            throw new IllegalStateException("Invalid chunk size encountered (byte 0)");
+        }
+
+        return b - '0';
+    }
+
+    private static boolean isHeaderLengthFirst(byte b) {
+        return b >= '1' && b <= '9';
+    }
 }