From 74e05910ae9f155fe4e0a8ed71686b9425dfbf98 Mon Sep 17 00:00:00 2001 From: Maros Marsalek Date: Thu, 20 Feb 2014 13:19:03 +0100 Subject: [PATCH] Fix message aggregator for chunked netconf messages. Aggregator did not work with 1 message in multiple chunks. Change-Id: I03cc89c3fc56a52f077fd21f77bd98e47ea358a9 Signed-off-by: Maros Marsalek --- .../util/handler/NetconfChunkAggregator.java | 66 ++++++++++++----- .../handler/NetconfChunkAggregatorTest.java | 71 +++++++++++++++++++ 2 files changed, 121 insertions(+), 16 deletions(-) create mode 100644 opendaylight/netconf/netconf-util/src/test/java/org/opendaylight/controller/netconf/util/handler/NetconfChunkAggregatorTest.java diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfChunkAggregator.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfChunkAggregator.java index f7045c3b30..219e92c3f1 100644 --- a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfChunkAggregator.java +++ b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfChunkAggregator.java @@ -8,15 +8,17 @@ package org.opendaylight.controller.netconf.util.handler; -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.ByteToMessageDecoder; - import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + public class NetconfChunkAggregator extends ByteToMessageDecoder { private final static Logger logger = LoggerFactory.getLogger(NetconfChunkAggregator.class); public static final int DEFAULT_MAXIMUM_CHUNK_SIZE = 16 * 1024 * 1024; @@ -36,7 +38,7 @@ public class NetconfChunkAggregator extends ByteToMessageDecoder { private final int maxChunkSize = DEFAULT_MAXIMUM_CHUNK_SIZE; private State state = State.HEADER_ONE; private long chunkSize; - private ByteBuf chunk; + private CompositeByteBuf chunk; @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { @@ -51,6 +53,8 @@ public class NetconfChunkAggregator extends ByteToMessageDecoder { } state = State.HEADER_TWO; + + initChunk(); break; } case HEADER_TWO: @@ -67,12 +71,7 @@ public class NetconfChunkAggregator extends ByteToMessageDecoder { case HEADER_LENGTH_FIRST: { final byte b = in.readByte(); - if (b < '1' || b > '9') { - logger.debug("Got byte {} while waiting for {}-{}", b, (byte)'1', (byte)'9'); - throw new IllegalStateException("Invalid chunk size encountered (byte 0)"); - } - - chunkSize = b - '0'; + chunkSize = processHeaderLengthFirst(b); state = State.HEADER_LENGTH_OTHER; break; } @@ -111,7 +110,7 @@ public class NetconfChunkAggregator extends ByteToMessageDecoder { return; } - chunk = in.readBytes((int)chunkSize); + aggregateChunks(in.readBytes((int) chunkSize)); state = State.FOOTER_ONE; break; case FOOTER_ONE: @@ -123,11 +122,13 @@ public class NetconfChunkAggregator extends ByteToMessageDecoder { } state = State.FOOTER_TWO; + chunkSize = 0; break; } case FOOTER_TWO: { final byte b = in.readByte(); + if (b != '#') { logger.debug("Got byte {} while waiting for {}", b, (byte)'#'); throw new IllegalStateException("Malformed chunk footer encountered (byte 1)"); @@ -139,12 +140,22 @@ public class NetconfChunkAggregator extends ByteToMessageDecoder { case FOOTER_THREE: { final byte b = in.readByte(); - if (b != '#') { - logger.debug("Got byte {} while waiting for {}", b, (byte)'#'); + + // In this state, either header-of-new-chunk or message-end is expected + // Depends on the next character + + if (isHeaderLengthFirst(b)) { + // Extract header length#1 from new chunk + chunkSize = processHeaderLengthFirst(b); + // Proceed with next chunk processing + state = State.HEADER_LENGTH_OTHER; + } else if (b == '#') { + state = State.FOOTER_FOUR; + } else { + logger.debug("Got byte {} while waiting for {} or {}-{}", b, (byte) '#', (byte) '1', (byte) '9'); throw new IllegalStateException("Malformed chunk footer encountered (byte 2)"); } - state = State.FOOTER_FOUR; break; } case FOOTER_FOUR: @@ -157,7 +168,6 @@ public class NetconfChunkAggregator extends ByteToMessageDecoder { state = State.HEADER_ONE; out.add(chunk); - chunkSize = 0; chunk = null; break; } @@ -166,4 +176,28 @@ public class NetconfChunkAggregator extends ByteToMessageDecoder { in.discardReadBytes(); } + + private void initChunk() { + chunk = Unpooled.compositeBuffer(); + } + + private void aggregateChunks(ByteBuf newChunk) { + chunk.addComponent(chunk.numComponents(), newChunk); + + // Update writer index, addComponent does not update it + chunk.writerIndex(chunk.writerIndex() + newChunk.readableBytes()); + } + + private static int processHeaderLengthFirst(byte b) { + if (isHeaderLengthFirst(b) == false) { + logger.debug("Got byte {} while waiting for {}-{}", b, (byte)'1', (byte)'9'); + throw new IllegalStateException("Invalid chunk size encountered (byte 0)"); + } + + return b - '0'; + } + + private static boolean isHeaderLengthFirst(byte b) { + return b >= '1' && b <= '9'; + } } diff --git a/opendaylight/netconf/netconf-util/src/test/java/org/opendaylight/controller/netconf/util/handler/NetconfChunkAggregatorTest.java b/opendaylight/netconf/netconf-util/src/test/java/org/opendaylight/controller/netconf/util/handler/NetconfChunkAggregatorTest.java new file mode 100644 index 0000000000..bdc183579b --- /dev/null +++ b/opendaylight/netconf/netconf-util/src/test/java/org/opendaylight/controller/netconf/util/handler/NetconfChunkAggregatorTest.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.netconf.util.handler; + +import com.google.common.base.Charsets; +import com.google.common.collect.Lists; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import junit.framework.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.List; + +public class NetconfChunkAggregatorTest { + + private static final String CHUNKED_MESSAGE = "\n#4\n" + + "\n" + + " \n" + + "" + + "\n##\n"; + + public static final String EXPECTED_MESSAGE = "\n" + + " \n" + + ""; + + private static final String CHUNKED_MESSAGE_ONE = "\n#101\n" + EXPECTED_MESSAGE + "\n##\n"; + + private static NetconfChunkAggregator agr; + + @BeforeClass + public static void setUp() throws Exception { + agr = new NetconfChunkAggregator(); + } + + @Test + public void testMultipleChunks() throws Exception { + List output = Lists.newArrayList(); + ByteBuf input = Unpooled.copiedBuffer(CHUNKED_MESSAGE.getBytes(Charsets.UTF_8)); + agr.decode(null, input, output); + + Assert.assertEquals(1, output.size()); + ByteBuf chunk = (ByteBuf) output.get(0); + + Assert.assertEquals(EXPECTED_MESSAGE, chunk.toString(Charsets.UTF_8)); + } + + @Test + public void testOneChunks() throws Exception { + List output = Lists.newArrayList(); + ByteBuf input = Unpooled.copiedBuffer(CHUNKED_MESSAGE_ONE.getBytes(Charsets.UTF_8)); + agr.decode(null, input, output); + + Assert.assertEquals(1, output.size()); + ByteBuf chunk = (ByteBuf) output.get(0); + + Assert.assertEquals(EXPECTED_MESSAGE, chunk.toString(Charsets.UTF_8)); + } + + +} -- 2.36.6