Reworked Netconf framing mechanism, added chunked framing mechanism. 97/1897/4
authorMilos Fabian <milfabia@cisco.com>
Tue, 15 Oct 2013 07:35:43 +0000 (09:35 +0200)
committerGerrit Code Review <gerrit@opendaylight.org>
Sun, 20 Oct 2013 21:04:38 +0000 (21:04 +0000)
Chunked framing mechanism http://tools.ietf.org/html/rfc6242#section-4.2

Change-Id: I00c44560836a8df66c33f6c90fa460984c4581fa
Signed-off-by: Milos Fabian <milfabia@cisco.com>
opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/MessageParserTest.java
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractChannelInitializer.java
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractNetconfSessionNegotiator.java
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/ChunkedFramingMechanismEncoder.java [new file with mode: 0644]
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/EOMFramingMechanismEncoder.java [new file with mode: 0644]
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/FramingMechanismHandlerFactory.java [new file with mode: 0644]
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfMessageAggregator.java [new file with mode: 0644]
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfMessageChunkDecoder.java [new file with mode: 0644]
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfMessageFactory.java

index 39f45e0..3b88a23 100644 (file)
@@ -10,82 +10,100 @@ package org.opendaylight.controller.netconf.impl;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
-import java.io.IOException;
+import java.util.Queue;
 
-import javax.xml.parsers.ParserConfigurationException;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.embedded.EmbeddedChannel;
 
-import org.custommonkey.xmlunit.XMLAssert;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.util.handler.FramingMechanismHandlerFactory;
+import org.opendaylight.controller.netconf.util.handler.NetconfMessageAggregator;
+import org.opendaylight.controller.netconf.util.handler.NetconfMessageChunkDecoder;
 import org.opendaylight.controller.netconf.util.messages.FramingMechanism;
 import org.opendaylight.controller.netconf.util.messages.NetconfMessageFactory;
+import org.opendaylight.controller.netconf.util.messages.NetconfMessageHeader;
 import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
-import org.opendaylight.controller.netconf.util.xml.XmlUtil;
-import org.opendaylight.protocol.framework.DeserializerException;
-import org.opendaylight.protocol.framework.DocumentedException;
-import org.opendaylight.protocol.util.ByteArray;
-import org.w3c.dom.Document;
-import org.xml.sax.SAXException;
+import org.opendaylight.protocol.framework.ProtocolMessageDecoder;
+import org.opendaylight.protocol.framework.ProtocolMessageEncoder;
 
 public class MessageParserTest {
 
-    private NetconfMessageFactory parser = null;
+    private NetconfMessage msg;
+    private NetconfMessageFactory msgFactory = new NetconfMessageFactory();
 
     @Before
-    public void setUp() {
-        this.parser = new NetconfMessageFactory();
+    public void setUp() throws Exception {
+        this.msg = XmlFileLoader.xmlFileToNetconfMessage("netconfMessages/getConfig.xml");
     }
 
     @Test
-    public void testPutEOM() throws IOException, SAXException, ParserConfigurationException {
-        final NetconfMessage msg = XmlFileLoader.xmlFileToNetconfMessage("netconfMessages/client_hello.xml");
-        final byte[] bytes = this.parser.put(msg);
-        assertArrayEquals(NetconfMessageFactory.endOfMessage, ByteArray.subByte(bytes, bytes.length
-                - NetconfMessageFactory.endOfMessage.length, NetconfMessageFactory.endOfMessage.length));
-    }
+    public void testChunkedFramingMechanismOnPipeline() throws Exception {
+        EmbeddedChannel testChunkChannel = new EmbeddedChannel(
+                FramingMechanismHandlerFactory.createHandler(FramingMechanism.CHUNK),
+                new ProtocolMessageEncoder<NetconfMessage>(msgFactory),
 
-    @Ignore
-    @Test
-    // TODO not working on WINDOWS
-    // arrays first differed at element [4]; expected:<49> but was:<53>
-    // at
-    // org.junit.internal.ComparisonCriteria.arrayEquals(ComparisonCriteria.java:52)
-    public void testPutChunk() throws IOException, SAXException, ParserConfigurationException {
-        final NetconfMessage msg = XmlFileLoader.xmlFileToNetconfMessage("netconfMessages/client_hello.xml");
-        this.parser.setFramingMechanism(FramingMechanism.CHUNK);
-        final byte[] bytes = this.parser.put(msg);
-        final byte[] header = new byte[] { (byte) 0x0a, (byte) 0x23, (byte) 0x32, (byte) 0x31, (byte) 0x31, (byte) 0x0a };
-        assertArrayEquals(header, ByteArray.subByte(bytes, 0, header.length));
-        assertArrayEquals(NetconfMessageFactory.endOfChunk, ByteArray.subByte(bytes, bytes.length
-                - NetconfMessageFactory.endOfChunk.length, NetconfMessageFactory.endOfChunk.length));
-    }
+                new NetconfMessageAggregator(FramingMechanism.CHUNK), new NetconfMessageChunkDecoder(),
+                new ProtocolMessageDecoder<NetconfMessage>(msgFactory));
 
-    @Test
-    public void testParseEOM() throws IOException, SAXException, DeserializerException, DocumentedException,
-            ParserConfigurationException {
-        final Document msg = XmlFileLoader.xmlFileToDocument("netconfMessages/client_hello.xml");
-        final byte[] bytes = this.parser.put(new NetconfMessage(msg));
-        final Document doc = this.parser
-                .parse(ByteArray.subByte(bytes, 0, bytes.length - NetconfMessageFactory.endOfMessage.length))
-                .iterator().next().getDocument();
-        assertEquals(XmlUtil.toString(msg), XmlUtil.toString(doc));
-        XMLAssert.assertXMLEqual(msg, doc);
+        testChunkChannel.writeOutbound(this.msg);
+        Queue<Object> messages = testChunkChannel.outboundMessages();
+        assertFalse(messages.isEmpty());
+
+        int msgLength = msgFactory.put(this.msg).length;
+        int chunkCount = msgLength / NetconfMessageFactory.MAX_CHUNK_SIZE;
+        if ((msgLength % NetconfMessageFactory.MAX_CHUNK_SIZE) != 0) {
+            chunkCount++;
+        }
+        for (int i = 1; i <= chunkCount; i++) {
+            ByteBuf recievedOutbound = (ByteBuf) messages.poll();
+            int exptHeaderLength = NetconfMessageFactory.MAX_CHUNK_SIZE;
+            if (i == chunkCount) {
+                exptHeaderLength = msgLength - (NetconfMessageFactory.MAX_CHUNK_SIZE * (i - 1));
+                byte[] eom = new byte[NetconfMessageFactory.endOfChunk.length];
+                recievedOutbound.getBytes(recievedOutbound.readableBytes() - NetconfMessageFactory.endOfChunk.length,
+                        eom);
+                assertArrayEquals(NetconfMessageFactory.endOfChunk, eom);
+            }
+
+            byte[] header = new byte[String.valueOf(exptHeaderLength).length() + NetconfMessageHeader.MIN_HEADER_LENGTH
+                    - 1];
+            recievedOutbound.getBytes(0, header);
+            NetconfMessageHeader messageHeader = new NetconfMessageHeader();
+            messageHeader.fromBytes(header);
+            assertEquals(exptHeaderLength, messageHeader.getLength());
+
+            testChunkChannel.writeInbound(recievedOutbound);
+        }
+        assertTrue(messages.isEmpty());
+
+        NetconfMessage receivedMessage = (NetconfMessage) testChunkChannel.readInbound();
+        assertNotNull(receivedMessage);
+        assertTrue(this.msg.getDocument().isEqualNode(receivedMessage.getDocument()));
     }
 
     @Test
-    public void testParseChunk() throws IOException, SAXException, DeserializerException, DocumentedException,
-            ParserConfigurationException {
-        final Document msg = XmlFileLoader.xmlFileToDocument("netconfMessages/client_hello.xml");
-        this.parser.setFramingMechanism(FramingMechanism.CHUNK);
-        final byte[] bytes = this.parser.put(new NetconfMessage(msg));
-        final Document doc = this.parser
-                .parse(ByteArray.subByte(bytes, 6, bytes.length - NetconfMessageFactory.endOfChunk.length - 6))
-                .iterator().next().getDocument();
-        assertEquals(XmlUtil.toString(msg), XmlUtil.toString(doc));
-        XMLAssert.assertXMLEqual(msg, doc);
-    }
+    public void testEOMFramingMechanismOnPipeline() throws Exception {
+        EmbeddedChannel testChunkChannel = new EmbeddedChannel(
+                FramingMechanismHandlerFactory.createHandler(FramingMechanism.EOM),
+                new ProtocolMessageEncoder<NetconfMessage>(msgFactory), new NetconfMessageAggregator(
+                        FramingMechanism.EOM), new ProtocolMessageDecoder<NetconfMessage>(msgFactory));
 
+        testChunkChannel.writeOutbound(this.msg);
+        ByteBuf recievedOutbound = (ByteBuf) testChunkChannel.readOutbound();
+
+        byte[] eom = new byte[NetconfMessageFactory.endOfMessage.length];
+        recievedOutbound.getBytes(recievedOutbound.readableBytes() - NetconfMessageFactory.endOfMessage.length, eom);
+        assertArrayEquals(NetconfMessageFactory.endOfMessage, eom);
+
+        testChunkChannel.writeInbound(recievedOutbound);
+        NetconfMessage receivedMessage = (NetconfMessage) testChunkChannel.readInbound();
+        assertNotNull(receivedMessage);
+        assertTrue(this.msg.getDocument().isEqualNode(receivedMessage.getDocument()));
+    }
 }
index 317a126..5d082c9 100644 (file)
@@ -13,6 +13,9 @@ import javax.net.ssl.SSLEngine;
 
 import org.opendaylight.controller.netconf.api.NetconfMessage;
 import org.opendaylight.controller.netconf.api.NetconfSession;
+import org.opendaylight.controller.netconf.util.handler.FramingMechanismHandlerFactory;
+import org.opendaylight.controller.netconf.util.handler.NetconfMessageAggregator;
+import org.opendaylight.controller.netconf.util.messages.FramingMechanism;
 import org.opendaylight.controller.netconf.util.messages.NetconfMessageFactory;
 import org.opendaylight.protocol.framework.ProtocolHandlerFactory;
 import org.opendaylight.protocol.framework.ProtocolMessageDecoder;
@@ -40,9 +43,10 @@ public abstract class AbstractChannelInitializer {
             initSsl(ch);
         }
 
-        ch.pipeline().addLast("frameDecoder", NetconfMessageFactory.getDelimiterFrameDecoder());
+        ch.pipeline().addLast("aggregator", new NetconfMessageAggregator(FramingMechanism.EOM));
         ch.pipeline().addLast(handlerFactory.getDecoders());
         initializeAfterDecoder(ch, promise);
+        ch.pipeline().addLast("frameEncoder", FramingMechanismHandlerFactory.createHandler(FramingMechanism.EOM));
         ch.pipeline().addLast(handlerFactory.getEncoders());
     }
 
index 9069d85..3ed75a1 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.netconf.util;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+
 import io.netty.channel.Channel;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.Timeout;
@@ -18,9 +19,14 @@ import io.netty.util.TimerTask;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 import io.netty.util.concurrent.Promise;
+
 import org.opendaylight.controller.netconf.api.NetconfMessage;
 import org.opendaylight.controller.netconf.api.NetconfSession;
 import org.opendaylight.controller.netconf.api.NetconfSessionPreferences;
+import org.opendaylight.controller.netconf.util.handler.FramingMechanismHandlerFactory;
+import org.opendaylight.controller.netconf.util.handler.NetconfMessageAggregator;
+import org.opendaylight.controller.netconf.util.handler.NetconfMessageChunkDecoder;
+import org.opendaylight.controller.netconf.util.messages.FramingMechanism;
 import org.opendaylight.controller.netconf.util.xml.XmlElement;
 import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
@@ -29,6 +35,7 @@ import org.opendaylight.protocol.framework.SessionListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.w3c.dom.Document;
+import org.w3c.dom.NodeList;
 
 import java.util.concurrent.TimeUnit;
 
@@ -114,6 +121,14 @@ public abstract class AbstractNetconfSessionNegotiator<P extends NetconfSessionP
         final Document doc = netconfMessage.getDocument();
 
         if (isHelloMessage(doc)) {
+            if (containsBase11Capability(doc)
+                    && containsBase11Capability(sessionPreferences.getHelloMessage().getDocument())) {
+                channel.pipeline().replace("frameEncoder", "frameEncoder",
+                        FramingMechanismHandlerFactory.createHandler(FramingMechanism.CHUNK));
+                channel.pipeline().replace("aggregator", "aggregator",
+                        new NetconfMessageAggregator(FramingMechanism.CHUNK));
+                channel.pipeline().addAfter("aggregator", "chunkDecoder", new NetconfMessageChunkDecoder());
+            }
             changeState(State.ESTABLISHED);
             S session = getSession(sessionListener, channel, doc);
             negotiationSuccessful(session);
@@ -144,6 +159,16 @@ public abstract class AbstractNetconfSessionNegotiator<P extends NetconfSessionP
         this.state = newState;
     }
 
+    private boolean containsBase11Capability(final Document doc) {
+        final NodeList nList = doc.getElementsByTagName("capability");
+        for (int i = 0; i < nList.getLength(); i++) {
+            if (nList.item(i).getTextContent().contains("base:1.1")) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     private static boolean isStateChangePermitted(State state, State newState) {
         if (state == State.IDLE && newState == State.OPEN_WAIT)
             return true;
diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/ChunkedFramingMechanismEncoder.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/ChunkedFramingMechanismEncoder.java
new file mode 100644 (file)
index 0000000..7f710c9
--- /dev/null
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.util.handler;
+
+import org.opendaylight.controller.netconf.util.messages.NetconfMessageFactory;
+import org.opendaylight.controller.netconf.util.messages.NetconfMessageHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+
+public class ChunkedFramingMechanismEncoder extends MessageToByteEncoder<ByteBuf> {
+
+    private final static Logger logger = LoggerFactory.getLogger(ChunkedFramingMechanismEncoder.class);
+
+    private NetconfMessageHeader messageHeader = new NetconfMessageHeader();
+
+    @Override
+    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
+        while (msg.readableBytes() > NetconfMessageFactory.MAX_CHUNK_SIZE) {
+            ByteBuf chunk = Unpooled.buffer(NetconfMessageFactory.MAX_CHUNK_SIZE);
+            chunk.writeBytes(createChunkHeader(NetconfMessageFactory.MAX_CHUNK_SIZE));
+            chunk.writeBytes(msg.readBytes(NetconfMessageFactory.MAX_CHUNK_SIZE));
+            ctx.write(chunk);
+        }
+        out.writeBytes(createChunkHeader(msg.readableBytes()));
+        out.writeBytes(msg.readBytes(msg.readableBytes()));
+        out.writeBytes(NetconfMessageFactory.endOfChunk);
+        logger.debug("Output message size is {}", out.readableBytes());
+    }
+
+    private ByteBuf createChunkHeader(int chunkSize) {
+        messageHeader.setLength(chunkSize);
+        logger.debug("Chunked data length is {}.", chunkSize);
+        return Unpooled.wrappedBuffer(messageHeader.toBytes());
+    }
+
+}
diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/EOMFramingMechanismEncoder.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/EOMFramingMechanismEncoder.java
new file mode 100644 (file)
index 0000000..b8a933a
--- /dev/null
@@ -0,0 +1,27 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.util.handler;
+
+import org.opendaylight.controller.netconf.util.messages.NetconfMessageFactory;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+
+public class EOMFramingMechanismEncoder extends MessageToByteEncoder<ByteBuf> {
+
+    private byte[] eom = NetconfMessageFactory.endOfMessage;
+
+    @Override
+    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
+        out.writeBytes(msg);
+        out.writeBytes(eom);
+    }
+
+}
diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/FramingMechanismHandlerFactory.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/FramingMechanismHandlerFactory.java
new file mode 100644 (file)
index 0000000..0d98084
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.util.handler;
+
+import org.opendaylight.controller.netconf.util.messages.FramingMechanism;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.handler.codec.MessageToByteEncoder;
+
+public class FramingMechanismHandlerFactory {
+
+    private final static Logger logger = LoggerFactory.getLogger(FramingMechanismHandlerFactory.class);
+
+    public static MessageToByteEncoder<ByteBuf> createHandler(FramingMechanism framingMechanism) {
+        logger.debug("{} framing mechanism was selected.", framingMechanism);
+        if (framingMechanism == FramingMechanism.EOM) {
+            return new EOMFramingMechanismEncoder();
+        } else {
+            return new ChunkedFramingMechanismEncoder();
+        }
+    }
+}
diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfMessageAggregator.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfMessageAggregator.java
new file mode 100644 (file)
index 0000000..131c3d7
--- /dev/null
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.util.handler;
+
+import java.util.List;
+
+import org.opendaylight.controller.netconf.util.messages.FramingMechanism;
+import org.opendaylight.controller.netconf.util.messages.NetconfMessageFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+
+public class NetconfMessageAggregator extends ByteToMessageDecoder {
+
+    private final static Logger logger = LoggerFactory.getLogger(NetconfMessageAggregator.class);
+
+    private byte[] eom = NetconfMessageFactory.endOfMessage;
+
+    public NetconfMessageAggregator(FramingMechanism framingMechanism) {
+        if (framingMechanism == FramingMechanism.CHUNK) {
+            eom = NetconfMessageFactory.endOfChunk;
+        }
+    }
+
+    @Override
+    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+        int index = indexOfSequence(in, eom);
+        if (index == -1) {
+            logger.debug("Message is not complete, read agian.");
+            ctx.read();
+        } else {
+            ByteBuf msg = in.readBytes(index);
+            in.readBytes(eom.length);
+            in.discardReadBytes();
+            logger.debug("Message is complete. {}", msg.readableBytes());
+            out.add(msg);
+        }
+    }
+
+    private int indexOfSequence(ByteBuf in, byte[] sequence) {
+        int index = -1;
+        for (int i = 0; i < in.readableBytes() - sequence.length + 1; i++) {
+            if (in.getByte(i) == sequence[0]) {
+                index = i;
+                for (int j = 1; j < sequence.length; j++) {
+                    if (in.getByte(i + j) != sequence[j]) {
+                        index = -1;
+                        break;
+                    }
+                }
+                if (index != -1) {
+                    return index;
+                }
+            }
+        }
+        return index;
+    }
+
+}
diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfMessageChunkDecoder.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfMessageChunkDecoder.java
new file mode 100644 (file)
index 0000000..b713fcd
--- /dev/null
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.util.handler;
+
+import java.nio.charset.Charset;
+import java.util.List;
+
+import org.opendaylight.controller.netconf.util.messages.NetconfMessageHeader;
+import org.opendaylight.protocol.framework.DeserializerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+
+public class NetconfMessageChunkDecoder extends ByteToMessageDecoder {
+
+    private final static Logger logger = LoggerFactory.getLogger(NetconfMessageChunkDecoder.class);
+
+    @Override
+    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+        ByteBuf byteBufMsg = Unpooled.buffer(in.readableBytes());
+        int chunkSize = -1;
+        boolean isParsed = false;
+        while (in.isReadable()) {
+            try {
+                if (!isParsed) {
+                    chunkSize = readHeader(in);
+                    isParsed = true;
+                }
+                if (chunkSize != -1 && isParsed) {
+                    in.readBytes(byteBufMsg, chunkSize);
+                    logger.debug("Chunked data of size {} read.", chunkSize);
+                    isParsed = false;
+                } else {
+                    throw new DeserializerException("Unable to parse chunked data or header.");
+                }
+            } catch (Exception e) {
+                logger.debug("Failed to decode chunked message.", e);
+                this.exceptionCaught(ctx, e);
+            }
+        }
+        out.add(byteBufMsg);
+        isParsed = false;
+    }
+
+    private int readHeader(ByteBuf in) {
+        ByteBuf chunkSize = Unpooled.buffer(NetconfMessageHeader.MIN_HEADER_LENGTH,
+                NetconfMessageHeader.MAX_HEADER_LENGTH);
+        byte b = in.readByte();
+        if (b != 10)
+            return -1;
+        b = in.readByte();
+        if (b != 35)
+            return -1;
+        while ((b = in.readByte()) != 10) {
+            chunkSize.writeByte(b);
+        }
+        return Integer.parseInt(chunkSize.toString(Charset.forName("UTF-8")));
+    }
+
+}
index ca3079b..029d2ba 100644 (file)
@@ -11,9 +11,6 @@ package org.opendaylight.controller.netconf.util.messages;
 import com.google.common.base.Charsets;
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelHandler;
-import io.netty.handler.codec.DelimiterBasedFrameDecoder;
 import org.opendaylight.controller.netconf.api.NetconfDeserializerException;
 import org.opendaylight.controller.netconf.api.NetconfMessage;
 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
@@ -44,9 +41,7 @@ public final class NetconfMessageFactory implements ProtocolMessageFactory<Netco
 
     public static final byte[] endOfChunk = "\n##\n".getBytes(Charsets.UTF_8);
 
-    private static final int MAX_CHUNK_SIZE = 1024; // Bytes
-
-    private FramingMechanism framing = FramingMechanism.EOM;
+    public static final int MAX_CHUNK_SIZE = 1024; // Bytes
 
     private final Optional<String> clientId;
 
@@ -58,10 +53,6 @@ public final class NetconfMessageFactory implements ProtocolMessageFactory<Netco
         this.clientId = clientId;
     }
 
-    public static ChannelHandler getDelimiterFrameDecoder() {
-        return new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Unpooled.copiedBuffer(endOfMessage));
-    }
-
     @Override
     public List<NetconfMessage> parse(byte[] bytes) throws DeserializerException, DocumentedException {
         String s = Charsets.UTF_8.decode(ByteBuffer.wrap(bytes)).toString();
@@ -90,52 +81,16 @@ public final class NetconfMessageFactory implements ProtocolMessageFactory<Netco
             Comment comment = netconfMessage.getDocument().createComment("clientId:" + clientId.get());
             netconfMessage.getDocument().appendChild(comment);
         }
-        byte[] bytes = (this.framing == FramingMechanism.EOM) ? this.putEOM(netconfMessage) : this
-                .putChunked(netconfMessage);
+        final ByteBuffer msgBytes = Charsets.UTF_8.encode(xmlToString(netconfMessage.getDocument()));
         String content = xmlToString(netconfMessage.getDocument());
 
         logger.trace("Putting message \n{}", content);
-        return bytes;
-    }
-
-    private byte[] putEOM(NetconfMessage msg) {
-        // create byte buffer from the String XML
-        // all Netconf messages are encoded using UTF-8
-        final ByteBuffer msgBytes = Charsets.UTF_8.encode(xmlToString(msg.getDocument()));
-        final ByteBuffer result = ByteBuffer.allocate(msgBytes.limit() + endOfMessage.length);
-        result.put(msgBytes);
-        // put end of message
-        result.put(endOfMessage);
-        return result.array();
-    }
-
-    private byte[] putChunked(NetconfMessage msg) {
-        final ByteBuffer msgBytes = Charsets.UTF_8.encode(xmlToString(msg.getDocument()));
-        final NetconfMessageHeader h = new NetconfMessageHeader();
-        if (msgBytes.limit() > MAX_CHUNK_SIZE)
-            logger.warn("Netconf message too long, should be split.");
-        h.setLength(msgBytes.limit());
-        final byte[] headerBytes = h.toBytes();
-        final ByteBuffer result = ByteBuffer.allocate(headerBytes.length + msgBytes.limit() + endOfChunk.length);
-        result.put(headerBytes);
-        result.put(msgBytes);
-        result.put(endOfChunk);
-        return result.array();
+        byte[] b = new byte[msgBytes.remaining()];
+        msgBytes.get(b);
+        return b;
     }
 
     private String xmlToString(Document doc) {
         return XmlUtil.toString(doc, false);
     }
-
-    /**
-     * For Hello message the framing is always EOM, but the framing mechanism
-     * may change.
-     *
-     * @param fm
-     *            new framing mechanism
-     */
-    public void setFramingMechanism(final FramingMechanism fm) {
-        logger.debug("Framing mechanism changed to {}", fm);
-        this.framing = fm;
-    }
 }