Split message aggregators 77/5377/1
authorRobert Varga <rovarga@cisco.com>
Sun, 16 Feb 2014 04:29:51 +0000 (05:29 +0100)
committerRobert Varga <rovarga@cisco.com>
Tue, 18 Feb 2014 05:22:35 +0000 (06:22 +0100)
This splits the EOM aggregator and the chunked encoding aggregator. The
chunked encoding can be much more efficient and hardened as it allows
for proper stream processing.

We do not go the entire way here, just tighten the chunk header parsing
by keeping an explicit state machine. We still need to gather the entire
message before shifting it, as our downstream expects to see the message
in its entirety.

Change-Id: I60947b6d018b73581a56adda5212d3482dda61ea
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/MessageParserTest.java
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractChannelInitializer.java
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractNetconfSessionNegotiator.java
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/EOMFramingMechanismEncoder.java
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfChunkAggregator.java [new file with mode: 0644]
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfEOMAggregator.java [moved from opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfMessageAggregator.java with 79% similarity]
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfMessageChunkDecoder.java [deleted file]
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfMessageConstants.java

index 0138cf2bcb09271c32edc96796f03327d342889a..8f9d89f1f10fb86fb37cf03fe9c83a536a2a5a82 100644 (file)
@@ -24,8 +24,8 @@ import org.junit.Test;
 import org.opendaylight.controller.netconf.api.NetconfMessage;
 import org.opendaylight.controller.netconf.util.handler.ChunkedFramingMechanismEncoder;
 import org.opendaylight.controller.netconf.util.handler.FramingMechanismHandlerFactory;
-import org.opendaylight.controller.netconf.util.handler.NetconfMessageAggregator;
-import org.opendaylight.controller.netconf.util.handler.NetconfMessageChunkDecoder;
+import org.opendaylight.controller.netconf.util.handler.NetconfChunkAggregator;
+import org.opendaylight.controller.netconf.util.handler.NetconfEOMAggregator;
 import org.opendaylight.controller.netconf.util.handler.NetconfMessageToXMLEncoder;
 import org.opendaylight.controller.netconf.util.handler.NetconfXMLToMessageDecoder;
 import org.opendaylight.controller.netconf.util.messages.FramingMechanism;
@@ -48,7 +48,7 @@ public class MessageParserTest {
                 FramingMechanismHandlerFactory.createHandler(FramingMechanism.CHUNK),
                 new NetconfMessageToXMLEncoder(),
 
-                new NetconfMessageAggregator(FramingMechanism.CHUNK), new NetconfMessageChunkDecoder(),
+                new NetconfChunkAggregator(),
                 new NetconfXMLToMessageDecoder());
 
         testChunkChannel.writeOutbound(this.msg);
@@ -94,15 +94,14 @@ public class MessageParserTest {
     public void testEOMFramingMechanismOnPipeline() throws Exception {
         EmbeddedChannel testChunkChannel = new EmbeddedChannel(
                 FramingMechanismHandlerFactory.createHandler(FramingMechanism.EOM),
-                new NetconfMessageToXMLEncoder(), new NetconfMessageAggregator(
-                        FramingMechanism.EOM), new NetconfXMLToMessageDecoder());
+                new NetconfMessageToXMLEncoder(), new NetconfEOMAggregator(), new NetconfXMLToMessageDecoder());
 
         testChunkChannel.writeOutbound(this.msg);
         ByteBuf recievedOutbound = (ByteBuf) testChunkChannel.readOutbound();
 
-        byte[] eom = new byte[NetconfMessageConstants.endOfMessage.length];
-        recievedOutbound.getBytes(recievedOutbound.readableBytes() - NetconfMessageConstants.endOfMessage.length, eom);
-        assertArrayEquals(NetconfMessageConstants.endOfMessage, eom);
+        byte[] eom = new byte[NetconfMessageConstants.END_OF_MESSAGE.length];
+        recievedOutbound.getBytes(recievedOutbound.readableBytes() - NetconfMessageConstants.END_OF_MESSAGE.length, eom);
+        assertArrayEquals(NetconfMessageConstants.END_OF_MESSAGE, eom);
 
         testChunkChannel.writeInbound(recievedOutbound);
         NetconfMessage receivedMessage = (NetconfMessage) testChunkChannel.readInbound();
index 0910d9403ad98af67427d9dc818e1ddcff216878..48a45845a4a6fe7f57a722bc5678d8fc8447b54d 100644 (file)
@@ -13,8 +13,8 @@ import io.netty.util.concurrent.Promise;
 
 import org.opendaylight.controller.netconf.api.NetconfSession;
 import org.opendaylight.controller.netconf.util.handler.FramingMechanismHandlerFactory;
+import org.opendaylight.controller.netconf.util.handler.NetconfEOMAggregator;
 import org.opendaylight.controller.netconf.util.handler.NetconfHelloMessageToXMLEncoder;
-import org.opendaylight.controller.netconf.util.handler.NetconfMessageAggregator;
 import org.opendaylight.controller.netconf.util.handler.NetconfXMLToHelloMessageDecoder;
 import org.opendaylight.controller.netconf.util.messages.FramingMechanism;
 
@@ -27,7 +27,7 @@ public abstract class AbstractChannelInitializer<S extends NetconfSession> {
     public static final String NETCONF_SESSION_NEGOTIATOR = "negotiator";
 
     public void initialize(SocketChannel ch, Promise<S> promise) {
-        ch.pipeline().addLast(NETCONF_MESSAGE_AGGREGATOR, new NetconfMessageAggregator(FramingMechanism.EOM));
+        ch.pipeline().addLast(NETCONF_MESSAGE_AGGREGATOR, new NetconfEOMAggregator());
         initializeMessageDecoder(ch);
         ch.pipeline().addLast(NETCONF_MESSAGE_FRAME_ENCODER, FramingMechanismHandlerFactory.createHandler(FramingMechanism.EOM));
         initializeMessageEncoder(ch);
index 9986b82bd8d6902a084e77a1a11a6421125ccb17..ea9fc5dce49209bc00daa277a93f877864f6d918 100644 (file)
@@ -8,6 +8,17 @@
 
 package org.opendaylight.controller.netconf.util;
 
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.util.concurrent.Promise;
+
 import java.util.concurrent.TimeUnit;
 
 import org.opendaylight.controller.netconf.api.AbstractNetconfSession;
@@ -15,12 +26,11 @@ import org.opendaylight.controller.netconf.api.NetconfMessage;
 import org.opendaylight.controller.netconf.api.NetconfSessionListener;
 import org.opendaylight.controller.netconf.api.NetconfSessionPreferences;
 import org.opendaylight.controller.netconf.util.handler.FramingMechanismHandlerFactory;
-import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage;
-import org.opendaylight.controller.netconf.util.handler.NetconfMessageAggregator;
-import org.opendaylight.controller.netconf.util.handler.NetconfMessageChunkDecoder;
+import org.opendaylight.controller.netconf.util.handler.NetconfChunkAggregator;
 import org.opendaylight.controller.netconf.util.handler.NetconfMessageToXMLEncoder;
 import org.opendaylight.controller.netconf.util.handler.NetconfXMLToMessageDecoder;
 import org.opendaylight.controller.netconf.util.messages.FramingMechanism;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage;
 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
 import org.opendaylight.protocol.framework.AbstractSessionNegotiator;
 import org.slf4j.Logger;
@@ -31,23 +41,11 @@ import org.w3c.dom.NodeList;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.util.Timeout;
-import io.netty.util.Timer;
-import io.netty.util.TimerTask;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
-import io.netty.util.concurrent.Promise;
-
 public abstract class AbstractNetconfSessionNegotiator<P extends NetconfSessionPreferences, S extends AbstractNetconfSession<S, L>, L extends NetconfSessionListener<S>>
 extends AbstractSessionNegotiator<NetconfHelloMessage, S> {
 
     private static final Logger logger = LoggerFactory.getLogger(AbstractNetconfSessionNegotiator.class);
     public static final String NAME_OF_EXCEPTION_HANDLER = "lastExceptionHandler";
-    public static final String CHUNK_DECODER_CHANNEL_HANDLER_KEY = "chunkDecoder";
 
     protected final P sessionPreferences;
 
@@ -162,7 +160,7 @@ extends AbstractSessionNegotiator<NetconfHelloMessage, S> {
             }
 
             changeState(State.ESTABLISHED);
-            S session = getSession(sessionListener, channel, (NetconfHelloMessage)netconfMessage);
+            S session = getSession(sessionListener, channel, netconfMessage);
 
             negotiationSuccessful(session);
         } else {
@@ -180,9 +178,7 @@ extends AbstractSessionNegotiator<NetconfHelloMessage, S> {
         replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_FRAME_ENCODER,
                 FramingMechanismHandlerFactory.createHandler(FramingMechanism.CHUNK));
         replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_AGGREGATOR,
-                new NetconfMessageAggregator(FramingMechanism.CHUNK));
-        channel.pipeline().addAfter(AbstractChannelInitializer.NETCONF_MESSAGE_AGGREGATOR,
-                CHUNK_DECODER_CHANNEL_HANDLER_KEY, new NetconfMessageChunkDecoder());
+                new NetconfChunkAggregator());
     }
 
     private boolean shouldUseChunkFraming(Document doc) {
index 383cebb04f680d2d7a352d1e18a6dcb8f81c3b90..a3efe8a16bd0c541971e7a4c0ca172b83144d3d5 100644 (file)
@@ -18,6 +18,6 @@ public class EOMFramingMechanismEncoder extends MessageToByteEncoder<ByteBuf> {
     @Override
     protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
         out.writeBytes(msg);
-        out.writeBytes(NetconfMessageConstants.endOfMessage);
+        out.writeBytes(NetconfMessageConstants.END_OF_MESSAGE);
     }
 }
diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfChunkAggregator.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfChunkAggregator.java
new file mode 100644 (file)
index 0000000..cee8e71
--- /dev/null
@@ -0,0 +1,164 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.util.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NetconfChunkAggregator extends ByteToMessageDecoder {
+    private final static Logger logger = LoggerFactory.getLogger(NetconfChunkAggregator.class);
+    public static final int DEFAULT_MAXIMUM_CHUNK_SIZE = 16 * 1024 * 1024;
+
+    private static enum State {
+        HEADER_ONE, // \n
+        HEADER_TWO, // #
+        HEADER_LENGTH_FIRST, // [1-9]
+        HEADER_LENGTH_OTHER, // [0-9]*\n
+        DATA,
+        FOOTER_ONE, // \n
+        FOOTER_TWO, // #
+        FOOTER_THREE, // #
+        FOOTER_FOUR, // \n
+    }
+
+    private final int maxChunkSize = DEFAULT_MAXIMUM_CHUNK_SIZE;
+    private State state = State.HEADER_ONE;
+    private long chunkSize;
+    private ByteBuf chunk;
+
+    @Override
+    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+        while (in.isReadable()) {
+            switch (state) {
+            case HEADER_ONE:
+            {
+                final byte b = in.readByte();
+                if (b != '\n') {
+                    throw new IllegalStateException("Malformed chunk header encountered (byte 0)");
+                }
+
+                state = State.HEADER_TWO;
+                break;
+            }
+            case HEADER_TWO:
+            {
+                final byte b = in.readByte();
+                if (b != '#') {
+                    throw new IllegalStateException("Malformed chunk header encountered (byte 1)");
+                }
+
+                state = State.HEADER_LENGTH_FIRST;
+                break;
+            }
+            case HEADER_LENGTH_FIRST:
+            {
+                final byte b = in.readByte();
+                if (b < '1' || b > '9') {
+                    throw new IllegalStateException("Invalid chunk size encountered (byte 0)");
+                }
+
+                chunkSize = b - '0';
+                state = State.HEADER_LENGTH_OTHER;
+                break;
+            }
+            case HEADER_LENGTH_OTHER:
+            {
+                final byte b = in.readByte();
+                if (b == '\n') {
+                    state = State.DATA;
+                    break;
+                }
+
+                if (b < '0' || b > '9') {
+                    throw new IllegalStateException("Invalid chunk size encountered");
+                }
+
+                chunkSize *= 10;
+                chunkSize += b - '0';
+
+                if (chunkSize > maxChunkSize) {
+                    throw new IllegalStateException("Maximum chunk size exceeded");
+                }
+                break;
+            }
+            case DATA:
+                /*
+                 * FIXME: this gathers all data into one big chunk before passing
+                 *        it on. Make sure the pipeline can work with partial data
+                 *        and then change this piece to pass the data on as it
+                 *        comes through.
+                 */
+                if (in.readableBytes() < chunkSize) {
+                    logger.debug("Buffer has {} bytes, need {} to complete chunk", in.readableBytes(), chunkSize);
+                    in.discardReadBytes();
+                    return;
+                }
+
+                chunk = in.readBytes((int)chunkSize);
+                state = State.FOOTER_ONE;
+                break;
+            case FOOTER_ONE:
+            {
+                final byte b = in.readByte();
+                if (b != '\n') {
+                    logger.debug("Got byte {} while waiting for {}", b, (byte)'\n');
+                    throw new IllegalStateException("Malformed chunk footer encountered (byte 0)");
+                }
+
+                state = State.FOOTER_TWO;
+                break;
+            }
+            case FOOTER_TWO:
+            {
+                final byte b = in.readByte();
+                if (b != '#') {
+                    logger.debug("Got byte {} while waiting for {}", b, (byte)'#');
+                    throw new IllegalStateException("Malformed chunk footer encountered (byte 1)");
+                }
+
+                state = State.FOOTER_THREE;
+                break;
+            }
+            case FOOTER_THREE:
+            {
+                final byte b = in.readByte();
+                if (b != '#') {
+                    logger.debug("Got byte {} while waiting for {}", b, (byte)'#');
+                    throw new IllegalStateException("Malformed chunk footer encountered (byte 2)");
+                }
+
+                state = State.FOOTER_FOUR;
+                break;
+            }
+            case FOOTER_FOUR:
+            {
+                final byte b = in.readByte();
+                if (b != '\n') {
+                    logger.debug("Got byte {} while waiting for {}", b, (byte)'\n');
+                    throw new IllegalStateException("Malformed chunk footer encountered (byte 3)");
+                }
+
+                state = State.HEADER_ONE;
+                out.add(chunk);
+                chunkSize = 0;
+                chunk = null;
+                break;
+            }
+            }
+        }
+
+        in.discardReadBytes();
+    }
+}
@@ -8,32 +8,24 @@
 
 package org.opendaylight.controller.netconf.util.handler;
 
-import com.google.common.base.Charsets;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.ByteToMessageDecoder;
-import org.opendaylight.controller.netconf.util.messages.FramingMechanism;
-import org.opendaylight.controller.netconf.util.messages.NetconfMessageConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.List;
 
-public class NetconfMessageAggregator extends ByteToMessageDecoder {
-
-    private final static Logger logger = LoggerFactory.getLogger(NetconfMessageAggregator.class);
+import org.opendaylight.controller.netconf.util.messages.NetconfMessageConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-    private byte[] eom = NetconfMessageConstants.endOfMessage;
+import com.google.common.base.Charsets;
 
-    public NetconfMessageAggregator(FramingMechanism framingMechanism) {
-        if (framingMechanism == FramingMechanism.CHUNK) {
-            eom = NetconfMessageConstants.endOfChunk;
-        }
-    }
+public class NetconfEOMAggregator extends ByteToMessageDecoder {
+    private final static Logger logger = LoggerFactory.getLogger(NetconfEOMAggregator.class);
 
     @Override
     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
-        int index = indexOfSequence(in, eom);
+        int index = indexOfSequence(in, NetconfMessageConstants.END_OF_MESSAGE);
         if (index == -1) {
             logger.debug("Message is not complete, read again.");
             if (logger.isTraceEnabled()) {
@@ -43,7 +35,7 @@ public class NetconfMessageAggregator extends ByteToMessageDecoder {
             ctx.read();
         } else {
             ByteBuf msg = in.readBytes(index);
-            in.readBytes(eom.length);
+            in.readBytes(NetconfMessageConstants.END_OF_MESSAGE.length);
             in.discardReadBytes();
             logger.debug("Message is complete.");
             out.add(msg);
diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfMessageChunkDecoder.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfMessageChunkDecoder.java
deleted file mode 100644 (file)
index 77ef386..0000000
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.netconf.util.handler;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.ByteToMessageDecoder;
-
-import java.util.List;
-
-import org.opendaylight.controller.netconf.api.NetconfDeserializerException;
-import org.opendaylight.controller.netconf.util.messages.NetconfMessageConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Charsets;
-
-public class NetconfMessageChunkDecoder extends ByteToMessageDecoder {
-    private final static Logger logger = LoggerFactory.getLogger(NetconfMessageChunkDecoder.class);
-
-    @Override
-    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
-        ByteBuf byteBufMsg = Unpooled.buffer(in.readableBytes());
-        int chunkSize = -1;
-        boolean isParsed = false;
-        while (in.isReadable()) {
-            try {
-                if (!isParsed) {
-                    chunkSize = readHeader(in);
-                    isParsed = true;
-                }
-                if (chunkSize != -1 && isParsed) {
-                    in.readBytes(byteBufMsg, chunkSize);
-                    isParsed = false;
-                } else {
-                    throw new NetconfDeserializerException("Unable to parse chunked data or header.");
-                }
-            } catch (Exception e) {
-                logger.error("Failed to decode chunked message.", e);
-                this.exceptionCaught(ctx, e);
-            }
-        }
-        out.add(byteBufMsg);
-        isParsed = false;
-    }
-
-    private int readHeader(ByteBuf in) {
-        ByteBuf chunkSize = Unpooled.buffer(NetconfMessageConstants.MIN_HEADER_LENGTH,
-                NetconfMessageConstants.MAX_HEADER_LENGTH);
-        byte b = in.readByte();
-        if (b != 10)
-            return -1;
-        b = in.readByte();
-        if (b != 35)
-            return -1;
-        while ((b = in.readByte()) != 10) {
-            chunkSize.writeByte(b);
-        }
-        return Integer.parseInt(chunkSize.toString(Charsets.US_ASCII));
-    }
-
-}
index 1a08f9063fdad0d526c483e10d0a83cce3acbb62..c111998e0ae8c271cd7fbd4c22383930a5a411c3 100644 (file)
@@ -11,12 +11,15 @@ package org.opendaylight.controller.netconf.util.messages;
 import com.google.common.base.Charsets;
 
 public class NetconfMessageConstants {
-
-    public static final byte[] endOfMessage = "]]>]]>".getBytes(Charsets.UTF_8);
+    /**
+     * The NETCONF 1.0 old-style message separator. This is framing mechanism
+     * is used by default.
+     */
+    public static final byte[] END_OF_MESSAGE = "]]>]]>".getBytes(Charsets.UTF_8);
 
     public static final byte[] endOfChunk = "\n##\n".getBytes(Charsets.UTF_8);
 
     public static final int MIN_HEADER_LENGTH = 4; // bytes
 
     public static final int MAX_HEADER_LENGTH = 13; // bytes
-}
\ No newline at end of file
+}