Fix message aggregator for chunked netconf messages. 23/5423/3
authorMaros Marsalek <mmarsale@cisco.com>
Thu, 20 Feb 2014 12:19:03 +0000 (13:19 +0100)
committerMaros Marsalek <mmarsale@cisco.com>
Fri, 21 Feb 2014 08:40:09 +0000 (09:40 +0100)
Aggregator did not work with 1 message in multiple chunks.

Change-Id: I03cc89c3fc56a52f077fd21f77bd98e47ea358a9
Signed-off-by: Maros Marsalek <mmarsale@cisco.com>
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfChunkAggregator.java
opendaylight/netconf/netconf-util/src/test/java/org/opendaylight/controller/netconf/util/handler/NetconfChunkAggregatorTest.java [new file with mode: 0644]

index f7045c3..219e92c 100644 (file)
@@ -8,15 +8,17 @@
 
 package org.opendaylight.controller.netconf.util.handler;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.ByteToMessageDecoder;
-
 import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+
 public class NetconfChunkAggregator extends ByteToMessageDecoder {
     private final static Logger logger = LoggerFactory.getLogger(NetconfChunkAggregator.class);
     public static final int DEFAULT_MAXIMUM_CHUNK_SIZE = 16 * 1024 * 1024;
@@ -36,7 +38,7 @@ public class NetconfChunkAggregator extends ByteToMessageDecoder {
     private final int maxChunkSize = DEFAULT_MAXIMUM_CHUNK_SIZE;
     private State state = State.HEADER_ONE;
     private long chunkSize;
-    private ByteBuf chunk;
+    private CompositeByteBuf chunk;
 
     @Override
     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
@@ -51,6 +53,8 @@ public class NetconfChunkAggregator extends ByteToMessageDecoder {
                 }
 
                 state = State.HEADER_TWO;
+
+                initChunk();
                 break;
             }
             case HEADER_TWO:
@@ -67,12 +71,7 @@ public class NetconfChunkAggregator extends ByteToMessageDecoder {
             case HEADER_LENGTH_FIRST:
             {
                 final byte b = in.readByte();
-                if (b < '1' || b > '9') {
-                    logger.debug("Got byte {} while waiting for {}-{}", b, (byte)'1', (byte)'9');
-                    throw new IllegalStateException("Invalid chunk size encountered (byte 0)");
-                }
-
-                chunkSize = b - '0';
+                chunkSize = processHeaderLengthFirst(b);
                 state = State.HEADER_LENGTH_OTHER;
                 break;
             }
@@ -111,7 +110,7 @@ public class NetconfChunkAggregator extends ByteToMessageDecoder {
                     return;
                 }
 
-                chunk = in.readBytes((int)chunkSize);
+                aggregateChunks(in.readBytes((int) chunkSize));
                 state = State.FOOTER_ONE;
                 break;
             case FOOTER_ONE:
@@ -123,11 +122,13 @@ public class NetconfChunkAggregator extends ByteToMessageDecoder {
                 }
 
                 state = State.FOOTER_TWO;
+                chunkSize = 0;
                 break;
             }
             case FOOTER_TWO:
             {
                 final byte b = in.readByte();
+
                 if (b != '#') {
                     logger.debug("Got byte {} while waiting for {}", b, (byte)'#');
                     throw new IllegalStateException("Malformed chunk footer encountered (byte 1)");
@@ -139,12 +140,22 @@ public class NetconfChunkAggregator extends ByteToMessageDecoder {
             case FOOTER_THREE:
             {
                 final byte b = in.readByte();
-                if (b != '#') {
-                    logger.debug("Got byte {} while waiting for {}", b, (byte)'#');
+
+                // In this state, either header-of-new-chunk or message-end is expected
+                // Depends on the next character
+
+                if (isHeaderLengthFirst(b)) {
+                    // Extract header length#1 from new chunk
+                    chunkSize = processHeaderLengthFirst(b);
+                    // Proceed with next chunk processing
+                    state = State.HEADER_LENGTH_OTHER;
+                } else if (b == '#') {
+                    state = State.FOOTER_FOUR;
+                } else {
+                    logger.debug("Got byte {} while waiting for {} or {}-{}", b, (byte) '#', (byte) '1', (byte) '9');
                     throw new IllegalStateException("Malformed chunk footer encountered (byte 2)");
                 }
 
-                state = State.FOOTER_FOUR;
                 break;
             }
             case FOOTER_FOUR:
@@ -157,7 +168,6 @@ public class NetconfChunkAggregator extends ByteToMessageDecoder {
 
                 state = State.HEADER_ONE;
                 out.add(chunk);
-                chunkSize = 0;
                 chunk = null;
                 break;
             }
@@ -166,4 +176,28 @@ public class NetconfChunkAggregator extends ByteToMessageDecoder {
 
         in.discardReadBytes();
     }
+
+    private void initChunk() {
+        chunk = Unpooled.compositeBuffer();
+    }
+
+    private void aggregateChunks(ByteBuf newChunk) {
+        chunk.addComponent(chunk.numComponents(), newChunk);
+
+        // Update writer index, addComponent does not update it
+        chunk.writerIndex(chunk.writerIndex() + newChunk.readableBytes());
+    }
+
+    private static int processHeaderLengthFirst(byte b) {
+        if (isHeaderLengthFirst(b) == false) {
+            logger.debug("Got byte {} while waiting for {}-{}", b, (byte)'1', (byte)'9');
+            throw new IllegalStateException("Invalid chunk size encountered (byte 0)");
+        }
+
+        return b - '0';
+    }
+
+    private static boolean isHeaderLengthFirst(byte b) {
+        return b >= '1' && b <= '9';
+    }
 }
diff --git a/opendaylight/netconf/netconf-util/src/test/java/org/opendaylight/controller/netconf/util/handler/NetconfChunkAggregatorTest.java b/opendaylight/netconf/netconf-util/src/test/java/org/opendaylight/controller/netconf/util/handler/NetconfChunkAggregatorTest.java
new file mode 100644 (file)
index 0000000..bdc1835
--- /dev/null
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.netconf.util.handler;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import junit.framework.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+
+public class NetconfChunkAggregatorTest {
+
+    private static final String CHUNKED_MESSAGE = "\n#4\n" +
+            "<rpc" +
+            "\n#18\n" +
+            " message-id=\"102\"\n" +
+            "\n#79\n" +
+            "     xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
+            "  <close-session/>\n" +
+            "</rpc>" +
+            "\n##\n";
+
+    public static final String EXPECTED_MESSAGE = "<rpc message-id=\"102\"\n" +
+            "     xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" +
+            "  <close-session/>\n" +
+            "</rpc>";
+
+    private static final String CHUNKED_MESSAGE_ONE = "\n#101\n" + EXPECTED_MESSAGE + "\n##\n";
+
+    private static NetconfChunkAggregator agr;
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        agr = new NetconfChunkAggregator();
+    }
+
+    @Test
+    public void testMultipleChunks() throws Exception {
+        List<Object> output = Lists.newArrayList();
+        ByteBuf input = Unpooled.copiedBuffer(CHUNKED_MESSAGE.getBytes(Charsets.UTF_8));
+        agr.decode(null, input, output);
+
+        Assert.assertEquals(1, output.size());
+        ByteBuf chunk = (ByteBuf) output.get(0);
+
+        Assert.assertEquals(EXPECTED_MESSAGE, chunk.toString(Charsets.UTF_8));
+    }
+
+    @Test
+    public void testOneChunks() throws Exception {
+        List<Object> output = Lists.newArrayList();
+        ByteBuf input = Unpooled.copiedBuffer(CHUNKED_MESSAGE_ONE.getBytes(Charsets.UTF_8));
+        agr.decode(null, input, output);
+
+        Assert.assertEquals(1, output.size());
+        ByteBuf chunk = (ByteBuf) output.get(0);
+
+        Assert.assertEquals(EXPECTED_MESSAGE, chunk.toString(Charsets.UTF_8));
+    }
+
+
+}