From eac080120972a30ae37566fec20521286e87ef18 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Sun, 16 Feb 2014 05:29:51 +0100 Subject: [PATCH] Split message aggregators This splits the EOM aggregator and the chunked encoding aggregator. The chunked encoding can be much more efficient and hardened as it allows for proper stream processing. We do not go the entire way here, just tighten the chunk header parsing by keeping an explicit state machine. We still need to gather the entire message before shifting it, as our downstream expects to see the message in its entirety. Change-Id: I60947b6d018b73581a56adda5212d3482dda61ea Signed-off-by: Robert Varga --- .../netconf/impl/MessageParserTest.java | 15 +- .../util/AbstractChannelInitializer.java | 4 +- .../AbstractNetconfSessionNegotiator.java | 34 ++-- .../handler/EOMFramingMechanismEncoder.java | 2 +- .../util/handler/NetconfChunkAggregator.java | 164 ++++++++++++++++++ ...regator.java => NetconfEOMAggregator.java} | 24 +-- .../handler/NetconfMessageChunkDecoder.java | 69 -------- .../messages/NetconfMessageConstants.java | 9 +- 8 files changed, 203 insertions(+), 118 deletions(-) create mode 100644 opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfChunkAggregator.java rename opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/{NetconfMessageAggregator.java => NetconfEOMAggregator.java} (79%) delete mode 100644 opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfMessageChunkDecoder.java diff --git a/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/MessageParserTest.java b/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/MessageParserTest.java index 0138cf2bcb..8f9d89f1f1 100644 --- a/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/MessageParserTest.java +++ b/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/MessageParserTest.java @@ -24,8 +24,8 @@ import org.junit.Test; import org.opendaylight.controller.netconf.api.NetconfMessage; import org.opendaylight.controller.netconf.util.handler.ChunkedFramingMechanismEncoder; import org.opendaylight.controller.netconf.util.handler.FramingMechanismHandlerFactory; -import org.opendaylight.controller.netconf.util.handler.NetconfMessageAggregator; -import org.opendaylight.controller.netconf.util.handler.NetconfMessageChunkDecoder; +import org.opendaylight.controller.netconf.util.handler.NetconfChunkAggregator; +import org.opendaylight.controller.netconf.util.handler.NetconfEOMAggregator; import org.opendaylight.controller.netconf.util.handler.NetconfMessageToXMLEncoder; import org.opendaylight.controller.netconf.util.handler.NetconfXMLToMessageDecoder; import org.opendaylight.controller.netconf.util.messages.FramingMechanism; @@ -48,7 +48,7 @@ public class MessageParserTest { FramingMechanismHandlerFactory.createHandler(FramingMechanism.CHUNK), new NetconfMessageToXMLEncoder(), - new NetconfMessageAggregator(FramingMechanism.CHUNK), new NetconfMessageChunkDecoder(), + new NetconfChunkAggregator(), new NetconfXMLToMessageDecoder()); testChunkChannel.writeOutbound(this.msg); @@ -94,15 +94,14 @@ public class MessageParserTest { public void testEOMFramingMechanismOnPipeline() throws Exception { EmbeddedChannel testChunkChannel = new EmbeddedChannel( FramingMechanismHandlerFactory.createHandler(FramingMechanism.EOM), - new NetconfMessageToXMLEncoder(), new NetconfMessageAggregator( - FramingMechanism.EOM), new NetconfXMLToMessageDecoder()); + new NetconfMessageToXMLEncoder(), new NetconfEOMAggregator(), new NetconfXMLToMessageDecoder()); testChunkChannel.writeOutbound(this.msg); ByteBuf recievedOutbound = (ByteBuf) testChunkChannel.readOutbound(); - byte[] eom = new byte[NetconfMessageConstants.endOfMessage.length]; - recievedOutbound.getBytes(recievedOutbound.readableBytes() - NetconfMessageConstants.endOfMessage.length, eom); - assertArrayEquals(NetconfMessageConstants.endOfMessage, eom); + byte[] eom = new byte[NetconfMessageConstants.END_OF_MESSAGE.length]; + recievedOutbound.getBytes(recievedOutbound.readableBytes() - NetconfMessageConstants.END_OF_MESSAGE.length, eom); + assertArrayEquals(NetconfMessageConstants.END_OF_MESSAGE, eom); testChunkChannel.writeInbound(recievedOutbound); NetconfMessage receivedMessage = (NetconfMessage) testChunkChannel.readInbound(); diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractChannelInitializer.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractChannelInitializer.java index 0910d9403a..48a45845a4 100644 --- a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractChannelInitializer.java +++ b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractChannelInitializer.java @@ -13,8 +13,8 @@ import io.netty.util.concurrent.Promise; import org.opendaylight.controller.netconf.api.NetconfSession; import org.opendaylight.controller.netconf.util.handler.FramingMechanismHandlerFactory; +import org.opendaylight.controller.netconf.util.handler.NetconfEOMAggregator; import org.opendaylight.controller.netconf.util.handler.NetconfHelloMessageToXMLEncoder; -import org.opendaylight.controller.netconf.util.handler.NetconfMessageAggregator; import org.opendaylight.controller.netconf.util.handler.NetconfXMLToHelloMessageDecoder; import org.opendaylight.controller.netconf.util.messages.FramingMechanism; @@ -27,7 +27,7 @@ public abstract class AbstractChannelInitializer { public static final String NETCONF_SESSION_NEGOTIATOR = "negotiator"; public void initialize(SocketChannel ch, Promise promise) { - ch.pipeline().addLast(NETCONF_MESSAGE_AGGREGATOR, new NetconfMessageAggregator(FramingMechanism.EOM)); + ch.pipeline().addLast(NETCONF_MESSAGE_AGGREGATOR, new NetconfEOMAggregator()); initializeMessageDecoder(ch); ch.pipeline().addLast(NETCONF_MESSAGE_FRAME_ENCODER, FramingMechanismHandlerFactory.createHandler(FramingMechanism.EOM)); initializeMessageEncoder(ch); diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractNetconfSessionNegotiator.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractNetconfSessionNegotiator.java index 9986b82bd8..ea9fc5dce4 100644 --- a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractNetconfSessionNegotiator.java +++ b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractNetconfSessionNegotiator.java @@ -8,6 +8,17 @@ package org.opendaylight.controller.netconf.util; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.ssl.SslHandler; +import io.netty.util.Timeout; +import io.netty.util.Timer; +import io.netty.util.TimerTask; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import io.netty.util.concurrent.Promise; + import java.util.concurrent.TimeUnit; import org.opendaylight.controller.netconf.api.AbstractNetconfSession; @@ -15,12 +26,11 @@ import org.opendaylight.controller.netconf.api.NetconfMessage; import org.opendaylight.controller.netconf.api.NetconfSessionListener; import org.opendaylight.controller.netconf.api.NetconfSessionPreferences; import org.opendaylight.controller.netconf.util.handler.FramingMechanismHandlerFactory; -import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage; -import org.opendaylight.controller.netconf.util.handler.NetconfMessageAggregator; -import org.opendaylight.controller.netconf.util.handler.NetconfMessageChunkDecoder; +import org.opendaylight.controller.netconf.util.handler.NetconfChunkAggregator; import org.opendaylight.controller.netconf.util.handler.NetconfMessageToXMLEncoder; import org.opendaylight.controller.netconf.util.handler.NetconfXMLToMessageDecoder; import org.opendaylight.controller.netconf.util.messages.FramingMechanism; +import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage; import org.opendaylight.controller.netconf.util.xml.XmlUtil; import org.opendaylight.protocol.framework.AbstractSessionNegotiator; import org.slf4j.Logger; @@ -31,23 +41,11 @@ import org.w3c.dom.NodeList; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.ssl.SslHandler; -import io.netty.util.Timeout; -import io.netty.util.Timer; -import io.netty.util.TimerTask; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GenericFutureListener; -import io.netty.util.concurrent.Promise; - public abstract class AbstractNetconfSessionNegotiator

, L extends NetconfSessionListener> extends AbstractSessionNegotiator { private static final Logger logger = LoggerFactory.getLogger(AbstractNetconfSessionNegotiator.class); public static final String NAME_OF_EXCEPTION_HANDLER = "lastExceptionHandler"; - public static final String CHUNK_DECODER_CHANNEL_HANDLER_KEY = "chunkDecoder"; protected final P sessionPreferences; @@ -162,7 +160,7 @@ extends AbstractSessionNegotiator { } changeState(State.ESTABLISHED); - S session = getSession(sessionListener, channel, (NetconfHelloMessage)netconfMessage); + S session = getSession(sessionListener, channel, netconfMessage); negotiationSuccessful(session); } else { @@ -180,9 +178,7 @@ extends AbstractSessionNegotiator { replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_FRAME_ENCODER, FramingMechanismHandlerFactory.createHandler(FramingMechanism.CHUNK)); replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_AGGREGATOR, - new NetconfMessageAggregator(FramingMechanism.CHUNK)); - channel.pipeline().addAfter(AbstractChannelInitializer.NETCONF_MESSAGE_AGGREGATOR, - CHUNK_DECODER_CHANNEL_HANDLER_KEY, new NetconfMessageChunkDecoder()); + new NetconfChunkAggregator()); } private boolean shouldUseChunkFraming(Document doc) { diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/EOMFramingMechanismEncoder.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/EOMFramingMechanismEncoder.java index 383cebb04f..a3efe8a16b 100644 --- a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/EOMFramingMechanismEncoder.java +++ b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/EOMFramingMechanismEncoder.java @@ -18,6 +18,6 @@ public class EOMFramingMechanismEncoder extends MessageToByteEncoder { @Override protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception { out.writeBytes(msg); - out.writeBytes(NetconfMessageConstants.endOfMessage); + out.writeBytes(NetconfMessageConstants.END_OF_MESSAGE); } } diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfChunkAggregator.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfChunkAggregator.java new file mode 100644 index 0000000000..cee8e7133a --- /dev/null +++ b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfChunkAggregator.java @@ -0,0 +1,164 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.netconf.util.handler; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NetconfChunkAggregator extends ByteToMessageDecoder { + private final static Logger logger = LoggerFactory.getLogger(NetconfChunkAggregator.class); + public static final int DEFAULT_MAXIMUM_CHUNK_SIZE = 16 * 1024 * 1024; + + private static enum State { + HEADER_ONE, // \n + HEADER_TWO, // # + HEADER_LENGTH_FIRST, // [1-9] + HEADER_LENGTH_OTHER, // [0-9]*\n + DATA, + FOOTER_ONE, // \n + FOOTER_TWO, // # + FOOTER_THREE, // # + FOOTER_FOUR, // \n + } + + private final int maxChunkSize = DEFAULT_MAXIMUM_CHUNK_SIZE; + private State state = State.HEADER_ONE; + private long chunkSize; + private ByteBuf chunk; + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + while (in.isReadable()) { + switch (state) { + case HEADER_ONE: + { + final byte b = in.readByte(); + if (b != '\n') { + throw new IllegalStateException("Malformed chunk header encountered (byte 0)"); + } + + state = State.HEADER_TWO; + break; + } + case HEADER_TWO: + { + final byte b = in.readByte(); + if (b != '#') { + throw new IllegalStateException("Malformed chunk header encountered (byte 1)"); + } + + state = State.HEADER_LENGTH_FIRST; + break; + } + case HEADER_LENGTH_FIRST: + { + final byte b = in.readByte(); + if (b < '1' || b > '9') { + throw new IllegalStateException("Invalid chunk size encountered (byte 0)"); + } + + chunkSize = b - '0'; + state = State.HEADER_LENGTH_OTHER; + break; + } + case HEADER_LENGTH_OTHER: + { + final byte b = in.readByte(); + if (b == '\n') { + state = State.DATA; + break; + } + + if (b < '0' || b > '9') { + throw new IllegalStateException("Invalid chunk size encountered"); + } + + chunkSize *= 10; + chunkSize += b - '0'; + + if (chunkSize > maxChunkSize) { + throw new IllegalStateException("Maximum chunk size exceeded"); + } + break; + } + case DATA: + /* + * FIXME: this gathers all data into one big chunk before passing + * it on. Make sure the pipeline can work with partial data + * and then change this piece to pass the data on as it + * comes through. + */ + if (in.readableBytes() < chunkSize) { + logger.debug("Buffer has {} bytes, need {} to complete chunk", in.readableBytes(), chunkSize); + in.discardReadBytes(); + return; + } + + chunk = in.readBytes((int)chunkSize); + state = State.FOOTER_ONE; + break; + case FOOTER_ONE: + { + final byte b = in.readByte(); + if (b != '\n') { + logger.debug("Got byte {} while waiting for {}", b, (byte)'\n'); + throw new IllegalStateException("Malformed chunk footer encountered (byte 0)"); + } + + state = State.FOOTER_TWO; + break; + } + case FOOTER_TWO: + { + final byte b = in.readByte(); + if (b != '#') { + logger.debug("Got byte {} while waiting for {}", b, (byte)'#'); + throw new IllegalStateException("Malformed chunk footer encountered (byte 1)"); + } + + state = State.FOOTER_THREE; + break; + } + case FOOTER_THREE: + { + final byte b = in.readByte(); + if (b != '#') { + logger.debug("Got byte {} while waiting for {}", b, (byte)'#'); + throw new IllegalStateException("Malformed chunk footer encountered (byte 2)"); + } + + state = State.FOOTER_FOUR; + break; + } + case FOOTER_FOUR: + { + final byte b = in.readByte(); + if (b != '\n') { + logger.debug("Got byte {} while waiting for {}", b, (byte)'\n'); + throw new IllegalStateException("Malformed chunk footer encountered (byte 3)"); + } + + state = State.HEADER_ONE; + out.add(chunk); + chunkSize = 0; + chunk = null; + break; + } + } + } + + in.discardReadBytes(); + } +} diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfMessageAggregator.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfEOMAggregator.java similarity index 79% rename from opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfMessageAggregator.java rename to opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfEOMAggregator.java index a2486050f9..9435e6ff73 100644 --- a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfMessageAggregator.java +++ b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfEOMAggregator.java @@ -8,32 +8,24 @@ package org.opendaylight.controller.netconf.util.handler; -import com.google.common.base.Charsets; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; -import org.opendaylight.controller.netconf.util.messages.FramingMechanism; -import org.opendaylight.controller.netconf.util.messages.NetconfMessageConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.List; -public class NetconfMessageAggregator extends ByteToMessageDecoder { - - private final static Logger logger = LoggerFactory.getLogger(NetconfMessageAggregator.class); +import org.opendaylight.controller.netconf.util.messages.NetconfMessageConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; - private byte[] eom = NetconfMessageConstants.endOfMessage; +import com.google.common.base.Charsets; - public NetconfMessageAggregator(FramingMechanism framingMechanism) { - if (framingMechanism == FramingMechanism.CHUNK) { - eom = NetconfMessageConstants.endOfChunk; - } - } +public class NetconfEOMAggregator extends ByteToMessageDecoder { + private final static Logger logger = LoggerFactory.getLogger(NetconfEOMAggregator.class); @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { - int index = indexOfSequence(in, eom); + int index = indexOfSequence(in, NetconfMessageConstants.END_OF_MESSAGE); if (index == -1) { logger.debug("Message is not complete, read again."); if (logger.isTraceEnabled()) { @@ -43,7 +35,7 @@ public class NetconfMessageAggregator extends ByteToMessageDecoder { ctx.read(); } else { ByteBuf msg = in.readBytes(index); - in.readBytes(eom.length); + in.readBytes(NetconfMessageConstants.END_OF_MESSAGE.length); in.discardReadBytes(); logger.debug("Message is complete."); out.add(msg); diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfMessageChunkDecoder.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfMessageChunkDecoder.java deleted file mode 100644 index 77ef3861d2..0000000000 --- a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfMessageChunkDecoder.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.netconf.util.handler; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.ByteToMessageDecoder; - -import java.util.List; - -import org.opendaylight.controller.netconf.api.NetconfDeserializerException; -import org.opendaylight.controller.netconf.util.messages.NetconfMessageConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Charsets; - -public class NetconfMessageChunkDecoder extends ByteToMessageDecoder { - private final static Logger logger = LoggerFactory.getLogger(NetconfMessageChunkDecoder.class); - - @Override - protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { - ByteBuf byteBufMsg = Unpooled.buffer(in.readableBytes()); - int chunkSize = -1; - boolean isParsed = false; - while (in.isReadable()) { - try { - if (!isParsed) { - chunkSize = readHeader(in); - isParsed = true; - } - if (chunkSize != -1 && isParsed) { - in.readBytes(byteBufMsg, chunkSize); - isParsed = false; - } else { - throw new NetconfDeserializerException("Unable to parse chunked data or header."); - } - } catch (Exception e) { - logger.error("Failed to decode chunked message.", e); - this.exceptionCaught(ctx, e); - } - } - out.add(byteBufMsg); - isParsed = false; - } - - private int readHeader(ByteBuf in) { - ByteBuf chunkSize = Unpooled.buffer(NetconfMessageConstants.MIN_HEADER_LENGTH, - NetconfMessageConstants.MAX_HEADER_LENGTH); - byte b = in.readByte(); - if (b != 10) - return -1; - b = in.readByte(); - if (b != 35) - return -1; - while ((b = in.readByte()) != 10) { - chunkSize.writeByte(b); - } - return Integer.parseInt(chunkSize.toString(Charsets.US_ASCII)); - } - -} diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfMessageConstants.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfMessageConstants.java index 1a08f9063f..c111998e0a 100644 --- a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfMessageConstants.java +++ b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfMessageConstants.java @@ -11,12 +11,15 @@ package org.opendaylight.controller.netconf.util.messages; import com.google.common.base.Charsets; public class NetconfMessageConstants { - - public static final byte[] endOfMessage = "]]>]]>".getBytes(Charsets.UTF_8); + /** + * The NETCONF 1.0 old-style message separator. This is framing mechanism + * is used by default. + */ + public static final byte[] END_OF_MESSAGE = "]]>]]>".getBytes(Charsets.UTF_8); public static final byte[] endOfChunk = "\n##\n".getBytes(Charsets.UTF_8); public static final int MIN_HEADER_LENGTH = 4; // bytes public static final int MAX_HEADER_LENGTH = 13; // bytes -} \ No newline at end of file +} -- 2.36.6