X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fnetconf-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Futil%2Fhandler%2FNetconfChunkAggregator.java;h=9f9f4191f7641943fe768712b94c0ec2b3f409b7;hp=cee8e7133a91cf092b829b92f51f79fa99e947f6;hb=3e20a64a21d5b7bced26b03108aedcd025dd8be6;hpb=eac080120972a30ae37566fec20521286e87ef18 diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfChunkAggregator.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfChunkAggregator.java index cee8e7133a..9f9f4191f7 100644 --- a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfChunkAggregator.java +++ b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfChunkAggregator.java @@ -8,17 +8,21 @@ package org.opendaylight.controller.netconf.util.handler; -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.ByteToMessageDecoder; - import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + public class NetconfChunkAggregator extends ByteToMessageDecoder { private final static Logger logger = LoggerFactory.getLogger(NetconfChunkAggregator.class); + private static final String GOT_PARAM_WHILE_WAITING_FOR_PARAM = "Got byte {} while waiting for {}"; + private static final String GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM = "Got byte {} while waiting for {}-{}"; public static final int DEFAULT_MAXIMUM_CHUNK_SIZE = 16 * 1024 * 1024; private static enum State { @@ -36,28 +40,47 @@ public class NetconfChunkAggregator extends ByteToMessageDecoder { private final int maxChunkSize = DEFAULT_MAXIMUM_CHUNK_SIZE; private State state = State.HEADER_ONE; private long chunkSize; - private ByteBuf chunk; + private CompositeByteBuf chunk; + + private void checkNewLine(byte b,String errorMessage){ + if (b != '\n') { + logger.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM, b, (byte)'\n'); + throw new IllegalStateException(errorMessage); + } + } + private void checkHash(byte b,String errorMessage){ + if (b != '#') { + logger.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM, b, (byte)'#'); + throw new IllegalStateException(errorMessage); + } + } + + private void checkChunkSize(){ + if (chunkSize > maxChunkSize) { + logger.debug("Parsed chunk size {}, maximum allowed is {}", chunkSize, maxChunkSize); + throw new IllegalStateException("Maximum chunk size exceeded"); + } + + } @Override - protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws IllegalStateException { while (in.isReadable()) { switch (state) { case HEADER_ONE: { final byte b = in.readByte(); - if (b != '\n') { - throw new IllegalStateException("Malformed chunk header encountered (byte 0)"); - } + checkNewLine(b, "Malformed chunk header encountered (byte 0)"); state = State.HEADER_TWO; + + initChunk(); break; } case HEADER_TWO: { final byte b = in.readByte(); - if (b != '#') { - throw new IllegalStateException("Malformed chunk header encountered (byte 1)"); - } + checkHash(b, "Malformed chunk header encountered (byte 1)"); state = State.HEADER_LENGTH_FIRST; break; @@ -65,11 +88,7 @@ public class NetconfChunkAggregator extends ByteToMessageDecoder { case HEADER_LENGTH_FIRST: { final byte b = in.readByte(); - if (b < '1' || b > '9') { - throw new IllegalStateException("Invalid chunk size encountered (byte 0)"); - } - - chunkSize = b - '0'; + chunkSize = processHeaderLengthFirst(b); state = State.HEADER_LENGTH_OTHER; break; } @@ -82,15 +101,13 @@ public class NetconfChunkAggregator extends ByteToMessageDecoder { } if (b < '0' || b > '9') { + logger.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM, b, (byte)'0', (byte)'9'); throw new IllegalStateException("Invalid chunk size encountered"); } chunkSize *= 10; chunkSize += b - '0'; - - if (chunkSize > maxChunkSize) { - throw new IllegalStateException("Maximum chunk size exceeded"); - } + checkChunkSize(); break; } case DATA: @@ -105,54 +122,41 @@ public class NetconfChunkAggregator extends ByteToMessageDecoder { in.discardReadBytes(); return; } - - chunk = in.readBytes((int)chunkSize); + aggregateChunks(in.readBytes((int) chunkSize)); state = State.FOOTER_ONE; break; case FOOTER_ONE: { final byte b = in.readByte(); - if (b != '\n') { - logger.debug("Got byte {} while waiting for {}", b, (byte)'\n'); - throw new IllegalStateException("Malformed chunk footer encountered (byte 0)"); - } - + checkNewLine(b,"Malformed chunk footer encountered (byte 0)"); state = State.FOOTER_TWO; + chunkSize = 0; break; } case FOOTER_TWO: { final byte b = in.readByte(); - if (b != '#') { - logger.debug("Got byte {} while waiting for {}", b, (byte)'#'); - throw new IllegalStateException("Malformed chunk footer encountered (byte 1)"); - } - + checkHash(b,"Malformed chunk footer encountered (byte 1)"); state = State.FOOTER_THREE; break; } case FOOTER_THREE: { final byte b = in.readByte(); - if (b != '#') { - logger.debug("Got byte {} while waiting for {}", b, (byte)'#'); - throw new IllegalStateException("Malformed chunk footer encountered (byte 2)"); - } - state = State.FOOTER_FOUR; + // In this state, either header-of-new-chunk or message-end is expected + // Depends on the next character + + extractNewChunkOrMessageEnd(b); + break; } case FOOTER_FOUR: { final byte b = in.readByte(); - if (b != '\n') { - logger.debug("Got byte {} while waiting for {}", b, (byte)'\n'); - throw new IllegalStateException("Malformed chunk footer encountered (byte 3)"); - } - + checkNewLine(b,"Malformed chunk footer encountered (byte 3)"); state = State.HEADER_ONE; out.add(chunk); - chunkSize = 0; chunk = null; break; } @@ -161,4 +165,42 @@ public class NetconfChunkAggregator extends ByteToMessageDecoder { in.discardReadBytes(); } + + private void extractNewChunkOrMessageEnd(byte b) { + if (isHeaderLengthFirst(b)) { + // Extract header length#1 from new chunk + chunkSize = processHeaderLengthFirst(b); + // Proceed with next chunk processing + state = State.HEADER_LENGTH_OTHER; + } else if (b == '#') { + state = State.FOOTER_FOUR; + } else { + logger.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM, b, (byte) '#', (byte) '1', (byte) '9'); + throw new IllegalStateException("Malformed chunk footer encountered (byte 2)"); + } + } + + private void initChunk() { + chunk = Unpooled.compositeBuffer(); + } + + private void aggregateChunks(ByteBuf newChunk) { + chunk.addComponent(chunk.numComponents(), newChunk); + + // Update writer index, addComponent does not update it + chunk.writerIndex(chunk.writerIndex() + newChunk.readableBytes()); + } + + private static int processHeaderLengthFirst(byte b) { + if (!isHeaderLengthFirst(b)) { + logger.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM, b, (byte)'1', (byte)'9'); + throw new IllegalStateException("Invalid chunk size encountered (byte 0)"); + } + + return b - '0'; + } + + private static boolean isHeaderLengthFirst(byte b) { + return b >= '1' && b <= '9'; + } }