public class NetconfChunkAggregator extends ByteToMessageDecoder {
private final static Logger logger = LoggerFactory.getLogger(NetconfChunkAggregator.class);
+ private static final String GOT_PARAM_WHILE_WAITING_FOR_PARAM = "Got byte {} while waiting for {}";
+ private static final String GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM = "Got byte {} while waiting for {}-{}";
public static final int DEFAULT_MAXIMUM_CHUNK_SIZE = 16 * 1024 * 1024;
private static enum State {
private long chunkSize;
private CompositeByteBuf chunk;
+ private void checkNewLine(byte b,String errorMessage){
+ if (b != '\n') {
+ logger.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM, b, (byte)'\n');
+ throw new IllegalStateException(errorMessage);
+ }
+ }
+
+ private void checkHash(byte b,String errorMessage){
+ if (b != '#') {
+ logger.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM, b, (byte)'#');
+ throw new IllegalStateException(errorMessage);
+ }
+ }
+
+ private void checkChunkSize(){
+ if (chunkSize > maxChunkSize) {
+ logger.debug("Parsed chunk size {}, maximum allowed is {}", chunkSize, maxChunkSize);
+ throw new IllegalStateException("Maximum chunk size exceeded");
+ }
+
+ }
@Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws IllegalStateException {
while (in.isReadable()) {
switch (state) {
case HEADER_ONE:
{
final byte b = in.readByte();
- if (b != '\n') {
- logger.debug("Got byte {} while waiting for {}", b, (byte)'\n');
- throw new IllegalStateException("Malformed chunk header encountered (byte 0)");
- }
+ checkNewLine(b, "Malformed chunk header encountered (byte 0)");
state = State.HEADER_TWO;
case HEADER_TWO:
{
final byte b = in.readByte();
- if (b != '#') {
- logger.debug("Got byte {} while waiting for {}", b, (byte)'#');
- throw new IllegalStateException("Malformed chunk header encountered (byte 1)");
- }
+ checkHash(b, "Malformed chunk header encountered (byte 1)");
state = State.HEADER_LENGTH_FIRST;
break;
}
if (b < '0' || b > '9') {
- logger.debug("Got byte {} while waiting for {}-{}", b, (byte)'0', (byte)'9');
+ logger.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM, b, (byte)'0', (byte)'9');
throw new IllegalStateException("Invalid chunk size encountered");
}
chunkSize *= 10;
chunkSize += b - '0';
-
- if (chunkSize > maxChunkSize) {
- logger.debug("Parsed chunk size {}, maximum allowed is {}", chunkSize, maxChunkSize);
- throw new IllegalStateException("Maximum chunk size exceeded");
- }
+ checkChunkSize();
break;
}
case DATA:
in.discardReadBytes();
return;
}
-
aggregateChunks(in.readBytes((int) chunkSize));
state = State.FOOTER_ONE;
break;
case FOOTER_ONE:
{
final byte b = in.readByte();
- if (b != '\n') {
- logger.debug("Got byte {} while waiting for {}", b, (byte)'\n');
- throw new IllegalStateException("Malformed chunk footer encountered (byte 0)");
- }
-
+ checkNewLine(b,"Malformed chunk footer encountered (byte 0)");
state = State.FOOTER_TWO;
chunkSize = 0;
break;
case FOOTER_TWO:
{
final byte b = in.readByte();
-
- if (b != '#') {
- logger.debug("Got byte {} while waiting for {}", b, (byte)'#');
- throw new IllegalStateException("Malformed chunk footer encountered (byte 1)");
- }
-
+ checkHash(b,"Malformed chunk footer encountered (byte 1)");
state = State.FOOTER_THREE;
break;
}
// In this state, either header-of-new-chunk or message-end is expected
// Depends on the next character
- if (isHeaderLengthFirst(b)) {
- // Extract header length#1 from new chunk
- chunkSize = processHeaderLengthFirst(b);
- // Proceed with next chunk processing
- state = State.HEADER_LENGTH_OTHER;
- } else if (b == '#') {
- state = State.FOOTER_FOUR;
- } else {
- logger.debug("Got byte {} while waiting for {} or {}-{}", b, (byte) '#', (byte) '1', (byte) '9');
- throw new IllegalStateException("Malformed chunk footer encountered (byte 2)");
- }
+ extractNewChunkOrMessageEnd(b);
break;
}
case FOOTER_FOUR:
{
final byte b = in.readByte();
- if (b != '\n') {
- logger.debug("Got byte {} while waiting for {}", b, (byte)'\n');
- throw new IllegalStateException("Malformed chunk footer encountered (byte 3)");
- }
-
+ checkNewLine(b,"Malformed chunk footer encountered (byte 3)");
state = State.HEADER_ONE;
out.add(chunk);
chunk = null;
in.discardReadBytes();
}
+ private void extractNewChunkOrMessageEnd(byte b) {
+ if (isHeaderLengthFirst(b)) {
+ // Extract header length#1 from new chunk
+ chunkSize = processHeaderLengthFirst(b);
+ // Proceed with next chunk processing
+ state = State.HEADER_LENGTH_OTHER;
+ } else if (b == '#') {
+ state = State.FOOTER_FOUR;
+ } else {
+ logger.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM, b, (byte) '#', (byte) '1', (byte) '9');
+ throw new IllegalStateException("Malformed chunk footer encountered (byte 2)");
+ }
+ }
+
private void initChunk() {
chunk = Unpooled.compositeBuffer();
}
}
private static int processHeaderLengthFirst(byte b) {
- if (isHeaderLengthFirst(b) == false) {
- logger.debug("Got byte {} while waiting for {}-{}", b, (byte)'1', (byte)'9');
+ if (!isHeaderLengthFirst(b)) {
+ logger.debug(GOT_PARAM_WHILE_WAITING_FOR_PARAM_PARAM, b, (byte)'1', (byte)'9');
throw new IllegalStateException("Invalid chunk size encountered (byte 0)");
}